from flask import Flask, render_template, request, redirect, url_for, send_from_directory, abort, flash,send_file import os import pandas as pd import pymysql import io import mysql.connector from werkzeug.utils import secure_filename from AppCode.Config import DBConfig from AppCode.FileHandler import FileHandler from AppCode.DocumentHandler import DocumentHandler from AppCode.ITRHandler import ITRHandler from AppCode.AOHandler import AOHandler from AppCode.CITHandler import CITHandler from AppCode.ITATHandler import ITATHandler from AppCode.YearGet import YearGet from AppCode.LoginAuth import LoginAuth # Server app = Flask(__name__) app.secret_key="secret1234" app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER auth = LoginAuth() app.register_blueprint(auth.bp) # welcome page @app.route('/') @auth.login_required def welcome(): return render_template('index.html') # Dashboard page @app.route('/dashboard') @auth.login_required def index(): return render_template('index.html') # Ensure folder exists def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in FileHandler.ALLOWED_EXTENSIONS # Upload File route @app.route('/upload', methods=['GET', 'POST']) def upload_file(): if request.method == 'POST': FileHandler.CHeckExistingOrCreateNewUploadFolder() docHandler = DocumentHandler() docHandler.Upload(request=request) return redirect(url_for('view_documents')) return render_template('upload.html') # View all documents with filters @app.route('/documents') def view_documents(): docHandler = DocumentHandler() docHandler.View(request=request) return render_template('view_docs.html', documents=docHandler.documents, years=docHandler.years) # Upload file documents @app.route('/uploads/') def uploaded_file(filename): mode = request.args.get('mode', 'view') filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename)) if not os.path.exists(filepath): abort(404) file_ext = filename.rsplit('.', 1)[-1].lower() # --- View Mode --- if mode == 'view': if file_ext == 'pdf': return send_file(filepath, mimetype='application/pdf') elif file_ext in ['xls', 'xlsx']: # Excel cannot be rendered in-browser by Flask; trigger download instead return send_file(filepath, as_attachment=False, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') else: return abort(415) # Unsupported type for viewing return send_file(filepath, as_attachment=True) ## =============================================== ## ITR (Income Tax Return) Routes ## =============================================== ## 1. READ/DISPLAY all ITR records @app.route('/itr_records') def display_itr(): itr = ITRHandler() records = itr.get_all_itr() itr.close() return render_template('display_itr.html', records=records) ## 2. CREATE/ADD a new ITR record @app.route('/itr/add', methods=['GET', 'POST']) def add_itr(): if request.method == 'POST': itr = ITRHandler() itr.add_itr(request.form) itr.close() flash("ITR record added successfully!", "success") return redirect(url_for('display_itr')) return render_template('add_itr.html') ## 4. DELETE an ITR record @app.route('/itr/delete/', methods=['POST']) def delete_itr(id): itr = ITRHandler() itr.delete_itr_by_id(id=id) itr.close() return redirect(url_for('display_itr')) ## 3. UPDATE an existing ITR record @app.route('/itr/update/', methods=['GET', 'POST']) def update_itr(id): itr = ITRHandler() if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} itr.update(id, data=data) itr.close() return redirect(url_for('display_itr')) record = itr.get_itr_by_id(id) itr.close() return render_template('update_itr.html', record=record) ## =============================================== ## AO (Assessing Officer) Routes ## =============================================== # 1. DISPLAY all AO records @app.route('/ao_records') def display_ao(): ao = AOHandler() ao_records = ao.get_all_ao() ao.close() return render_template('display_ao.html', ao_records=ao_records) # 2. ADD a new AO record @app.route('/ao/add', methods=['GET', 'POST']) def add_ao(): if request.method == 'POST': ao = AOHandler() ao.add_ao(request.form) ao.close() flash("AO record added successfully!", "success") return redirect(url_for('display_ao')) return render_template('add_ao.html') # 3. UPDATE AO record @app.route('/ao/update/', methods=['GET', 'POST']) def update_ao(id): ao = AOHandler() record = ao.get_ao_by_id(id) if not record: return "AO record not found", 404 if request.method == 'POST': data = request.form.to_dict() ao.update_ao(id, data) ao.close() flash("AO record updated successfully!", "success") return redirect(url_for('display_ao')) ao.close() return render_template("update_ao.html", record=record) # 4. DELETE AO record safely @app.route('/ao/delete/', methods=['POST']) def delete_ao(id): ao = AOHandler() ao.delete_ao_by_id(id=id) ao.close() flash("AO deleted successfully!", "success") return redirect(url_for('display_ao')) ## ======================================================= ## CIT (Commissioner of Income Tax) Routes ## ======================================================= # 1 DISPLAY all CIT records @app.route('/cit_records') def display_cit(): cit = CITHandler() cit_records = cit.get_all_cit() cit.close() return render_template('display_cit.html', cit_records=cit_records) # 2 new CIT records add @app.route('/cit/add', methods=['GET', 'POST']) def add_cit(): if request.method == 'POST': cit = CITHandler() cit.add_cit(request.form) cit.close() flash("CIT record added successfully!", "success") return redirect(url_for('display_cit')) return render_template('add_cit.html') # 3 delete CIT records by id @app.route('/cit/delete/', methods=['POST']) def delete_cit(id): cit = CITHandler() cit.delete_cit(id) cit.close() flash("CIT record deleted successfully!", "success") return redirect(url_for('display_cit')) # 4 update CIT records by id @app.route('/cit/update/', methods=['GET', 'POST']) def update_cit(id): cit = CITHandler() record = cit.get_cit_by_id(id) if not record: cit.close() return "CIT record not found", 404 if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} cit.update_cit(id, data) cit.close() return redirect(url_for('display_cit')) cit.close() return render_template('update_cit.html', record=record) ## ======================================================= ## ITAT (Income Tax Appellate Tribunal) Routes ## ======================================================= # 1.DISPLAY all ITAT records @app.route('/itat_records') def display_itat(): itat = ITATHandler() records = itat.get_all_itat() itat.close() return render_template('display_itat.html', records=records) # 2.Add new ITAT records @app.route('/itat/add', methods=['GET', 'POST']) def add_itat(): itat = ITATHandler() if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} itat.add_itat(data) itat.close() flash("ITAT record added successfully!", "success") return redirect(url_for('display_itat')) itat.close() return render_template('add_itat.html') # 3.Update ITAT records by id @app.route('/itat/update/', methods=['GET', 'POST']) def update_itat(id): itat = ITATHandler() record = itat.get_itat_by_id(id) if not record: flash("Record Not Found!", "danger") return redirect(url_for('display_itat')) if request.method == 'POST': data = { "year": request.form.get("year"), "mat_tax_credit": request.form.get("mat_tax_credit"), "surcharge": request.form.get("surcharge"), "cess": request.form.get("cess"), "total_credit": request.form.get("total_credit") } itat.update_itat(id, data) itat.close() flash("ITAT Record Updated!", "success") return redirect(url_for('display_itat')) itat.close() return render_template('update_itat.html', record=record) # 3.delete ITAT records by id @app.route('/itat/delete/', methods=['POST']) def delete_itat(id): itat = ITATHandler() itat.delete_itat_by_id(id) itat.close() flash("ITAT Record Deleted!", "success") return redirect(url_for('display_itat')) ## ======================================================= ## All Report Routes ## ======================================================= # report page @app.route('/reports') def reports(): return render_template("reports.html") # Itr report download by year @app.route('/itr_report', methods=['GET']) def itr_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: itr = ITRHandler() output = itr.itr_report_download(selected_year) itr.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITR_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetITRYears") yearGetter.close() return render_template("itr_reports.html", years=years) # Ao report download by year @app.route('/ao_report', methods=['GET']) def ao_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: ao = AOHandler() output = ao.ao_report_download(selected_year) ao.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name=f"AO_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetAOYears") yearGetter.close() return render_template("ao_reports.html", years=years) # Cit report download by year @app.route('/cit_report', methods=['GET']) def cit_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: cit = CITHandler() output = cit.cit_report_download(selected_year) cit.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"CIT_Report_{selected_year}_Vertical.xlsx" ) else: years = yearGetter.get_year_by_model("GetCITYears") yearGetter.close() return render_template("cit_reports.html", years=years) # Itat report download by year @app.route('/itat_report', methods=['GET']) def itat_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: itat = ITATHandler() output = itat.itat_report_download(selected_year) itat.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx" ) else: # Use stored procedure for years years = yearGetter.get_year_by_model("GetITATYears") yearGetter.close() return render_template("itat_reports.html", years=years) # summary report @app.route('/summary_report', methods=['GET']) def summary_report(): docHandler = DocumentHandler() return docHandler.Summary_report(request=request) # new new -- check year in table existe or not by using ajax calling. @app.route('/check_year', methods=['POST']) def check_year(): table_name = request.json.get("table") year = request.json.get("year") conn = DBConfig.get_db_connection() cursor = conn.cursor() query = f"SELECT COUNT(*) FROM {table_name} WHERE year = %s" cursor.execute(query, (year,)) result = cursor.fetchone()[0] cursor.close() conn.close() return {"exists": result > 0} # run if __name__ == '__main__': app.run(host='0.0.0.0', port=5003, debug=True)