from flask import Flask, render_template, request, redirect, url_for, send_from_directory, abort, flash,send_file import os import pandas as pd import pymysql import io import mysql.connector from werkzeug.utils import secure_filename from AppCode.FileHandler import FileHandler from AppCode.DocumentHandler import DocumentHandler from config import db_config from AppCode.Config import DBConfig from AppCode.ITRHandler import ITRHandler from AppCode.AOHandler import AOHandler from AppCode.CITHandler import CITHandler from AppCode.ITATHandler import ITATHandler from AppCode.YearGet import YearGet app = Flask(__name__) app.secret_key="secret1234" app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER # welcome page @app.route('/') def welcome(): return render_template('welcome.html') # Dashboard page @app.route('/dashboard') def index(): return render_template('index.html') # Ensure folder exists def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in FileHandler.ALLOWED_EXTENSIONS # Upload File route @app.route('/upload', methods=['GET', 'POST']) def upload_file(): if request.method == 'POST': FileHandler.CHeckExistingOrCreateNewUploadFolder() docHandler = DocumentHandler() docHandler.Upload(request=request) return redirect(url_for('view_documents')) return render_template('upload.html') # View all documents with filters @app.route('/documents') def view_documents(): docHandler = DocumentHandler() docHandler.View(request=request) return render_template('view_docs.html', documents=docHandler.documents, years=docHandler.years) # Upload file documents @app.route('/uploads/') def uploaded_file(filename): mode = request.args.get('mode', 'view') filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename)) if not os.path.exists(filepath): abort(404) file_ext = filename.rsplit('.', 1)[-1].lower() # --- View Mode --- if mode == 'view': if file_ext == 'pdf': return send_file(filepath, mimetype='application/pdf') elif file_ext in ['xls', 'xlsx']: # Excel cannot be rendered in-browser by Flask; trigger download instead return send_file(filepath, as_attachment=False, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') else: return abort(415) # Unsupported type for viewing return send_file(filepath, as_attachment=True) # @app.route('/itr', methods=['GET', 'POST']) # def itr_form(): # if request.method == 'POST': # data = {key: request.form.get(key, 0) for key in request.form} # conn = mysql.connector.connect(**db_config) # cursor = conn.cursor() # query = """ # INSERT INTO itr ( # year, gross_total_income, disallowance_14a, disallowance_37, # deduction_80ia_business, deduction_80ia_misc, deduction_80ia_other, # deduction_sec37_disallowance, deduction_80g, net_taxable_income, # tax_30_percent, tax_book_profit_18_5, tax_payable, surcharge_12, # edu_cess_3, total_tax_payable, mat_credit, interest_234c, # total_tax, advance_tax, tds, tcs, tax_on_assessment, refund # ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) # """ # values = tuple([ # int(data.get('year', 0)) # ] + [ # float(data.get(col, 0)) for col in [ # 'gross_total_income', 'disallowance_14a', 'disallowance_37', # 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other', # 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income', # 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12', # 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c', # 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund' # ] # ]) # cursor.execute(query, values) # conn.commit() # flash("ITR record deleted successfully!", "success") # cursor.close() # conn.close() # return redirect(url_for('index')) # return render_template('itr_form.html') ## =============================================== ## ITR (Income Tax Return) Routes ## =============================================== ## 1. READ/DISPLAY all ITR records @app.route('/itr_records') def display_itr(): itr = ITRHandler() records = itr.get_all_itr() itr.close() return render_template('display_itr.html', records=records) ## 2. CREATE/ADD a new ITR record @app.route('/itr/add', methods=['GET', 'POST']) def add_itr(): if request.method == 'POST': itr = ITRHandler() itr.add_itr(request.form) itr.close() flash("ITR record added successfully!", "success") return redirect(url_for('display_itr')) return render_template('add_itr.html') ## 4. DELETE an ITR record @app.route('/itr/delete/', methods=['POST']) def delete_itr(id): itr = ITRHandler() itr.delete_itr_by_id(id=id) itr.close() return redirect(url_for('display_itr')) ## 3. UPDATE an existing ITR record @app.route('/itr/update/', methods=['GET', 'POST']) def update_itr(id): itr = ITRHandler() if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} print("itr data-->",data) itr.update(id, data=data) itr.close() return redirect(url_for('display_itr')) record = itr.get_itr_by_id(id) itr.close() return render_template('update_itr.html', record=record) ## 3. UPDATE an existing ITR record # @app.route('/itr/update/', methods=['GET', 'POST']) # def update_itr(id): # conn = get_db_connection() # if request.method == 'POST': # cursor = conn.cursor() # columns = [ # 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37', # 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other', # 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income', # 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12', # 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c', # 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund' # ] # # Create the "SET column = %s" part of the query # set_clause = ', '.join([f"{col} = %s" for col in columns]) # query = f"UPDATE itr SET {set_clause} WHERE id = %s" # values = [request.form.get(col, 0) for col in columns] # values.append(id) # Add the ID for the WHERE clause at the end # cursor.execute(query, tuple(values)) # conn.commit() # cursor.close() # conn.close() # return redirect(url_for('display_itr')) # # For a GET request, fetch the existing data and show it in the form # cursor = conn.cursor(dictionary=True) # cursor.execute("SELECT * FROM itr WHERE id = %s", (id,)) # record = cursor.fetchone() # cursor.close() # conn.close() # return render_template('update_itr.html', record=record) ## =============================================== ## AO (Assessing Officer) Routes ## =============================================== # 1. DISPLAY all AO records @app.route('/ao_records') def display_ao(): ao = AOHandler() ao_records = ao.get_all_ao() ao.close() return render_template('display_ao.html', ao_records=ao_records) # 2. ADD a new AO record @app.route('/ao/add', methods=['GET', 'POST']) def add_ao(): if request.method == 'POST': ao = AOHandler() ao.add_ao(request.form) ao.close() flash("AO record added successfully!", "success") return redirect(url_for('display_ao')) return render_template('add_ao.html') # 3. UPDATE AO record @app.route('/ao/update/', methods=['GET', 'POST']) def update_ao(id): ao = AOHandler() record = ao.get_ao_by_id(id) if not record: return "AO record not found", 404 if request.method == 'POST': data = request.form.to_dict() ao.update_ao(id, data) ao.close() flash("AO record updated successfully!", "success") return redirect(url_for('display_ao')) ao.close() return render_template("update_ao.html", record=record) # 4. DELETE AO record safely @app.route('/ao/delete/', methods=['POST']) def delete_ao(id): ao = AOHandler() ao.delete_ao_by_id(id=id) ao.close() flash("AO deleted successfully!", "success") return redirect(url_for('display_ao')) # 3. UPDATE AO record # @app.route('/ao/update/', methods=['GET', 'POST']) # def update_ao(id): # conn = get_db_connection() # cursor = conn.cursor(dictionary=True) # cursor.execute("SELECT * FROM ao WHERE id = %s", (id,)) # ao_record = cursor.fetchone() # if not ao_record: # cursor.close() # conn.close() # return "AO record not found", 404 # if request.method == 'POST': # columns = [ # 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37', # 'deduction_80ia_business', 'deduction_sec37_disallowance', 'deduction_80g', # 'net_taxable_income', 'tax_30_percent', 'tax_book_profit_18_5', # 'surcharge_12', 'edu_cess_3', 'total_tax_payable', 'mat_credit', # 'interest_234c', 'total_tax', 'advance_tax', 'tds', 'tcs', # 'tax_on_assessment', 'refund' # ] # values = [request.form.get(col, 0) for col in columns] # set_clause = ", ".join([f"{col}=%s" for col in columns]) # query = f"UPDATE ao SET {set_clause} WHERE id=%s" # cursor.execute(query, tuple(values) + (id,)) # conn.commit() # cursor.close() # conn.close() # flash("AO record updated successfully!", "success") # return redirect(url_for('display_ao')) # cursor.close() # conn.close() # return render_template('update_ao.html', record=ao_record) ## ======================================================= ## CIT (Commissioner of Income Tax) Routes ## ======================================================= # DISPLAY all CIT records @app.route('/cit_records') def display_cit(): cit = CITHandler() cit_records = cit.get_all_cit() cit.close() return render_template('display_cit.html', cit_records=cit_records) @app.route('/cit/add', methods=['GET', 'POST']) def add_cit(): if request.method == 'POST': cit = CITHandler() cit.add_cit(request.form) cit.close() flash("CIT record added successfully!", "success") return redirect(url_for('display_cit')) return render_template('add_cit.html') @app.route('/cit/delete/', methods=['POST']) def delete_cit(id): cit = CITHandler() cit.delete_cit(id) cit.close() flash("CIT record deleted successfully!", "success") return redirect(url_for('display_cit')) @app.route('/cit/update/', methods=['GET', 'POST']) def update_cit(id): cit = CITHandler() record = cit.get_cit_by_id(id) if not record: cit.close() return "CIT record not found", 404 if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} cit.update_cit(id, data) cit.close() return redirect(url_for('display_cit')) cit.close() return render_template('update_cit.html', record=record) # @app.route('/cit/update/', methods=['GET', 'POST']) # def update_cit(id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM cit WHERE id=%s", (id,)) record = cursor.fetchone() if not record: cursor.close() conn.close() return "CIT record not found", 404 if request.method == 'POST': columns = [ "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance", "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5", "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit", "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund" ] values = [request.form.get(col, 0) for col in columns] set_clause = ", ".join([f"{col}=%s" for col in columns]) query = f"UPDATE cit SET {set_clause} WHERE id=%s" cursor.execute(query, tuple(values)+(id,)) conn.commit() cursor.close() conn.close() return redirect(url_for('display_cit')) cursor.close() conn.close() return render_template('add_cit.html', record=record) # DISPLAY all CIT records # @app.route('/cit_records') # def display_cit(): # conn = get_db_connection() # cursor = conn.cursor(dictionary=True) # cursor.execute("SELECT * FROM cit ORDER BY year DESC, id DESC") # cit_records = cursor.fetchall() # cursor.close() # conn.close() # return render_template('display_cit.html', cit_records=cit_records) # ADD a new CIT record # @app.route('/cit/add', methods=['GET', 'POST']) # def add_cit(): # if request.method == 'POST': # conn = get_db_connection() # cursor = conn.cursor() # columns = [ # "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance", # "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5", # "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit", # "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund" # ] # values = [request.form.get(col, 0) for col in columns] # query = f"INSERT INTO cit ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})" # cursor.execute(query, tuple(values)) # conn.commit() # flash("ITAT record added successfully!", "success") # cursor.close() # conn.close() # return redirect(url_for('display_cit')) # return render_template('add_cit.html') # @app.route('/cit/delete/', methods=['POST']) # def delete_cit(id): # try: # conn = get_db_connection() # cursor = conn.cursor() # cursor.execute("DELETE FROM cit WHERE id=%s", (id,)) # conn.commit() # flash("ITR record deleted successfully!", "success") # except Exception as err: # print(f"Error deleting CIT record: {err}") # finally: # cursor.close() # conn.close() # return redirect(url_for('display_cit')) ## ======================================================= ## ITAT (Income Tax Appellate Tribunal) Routes ## ======================================================= # DISPLAY all ITAT records @app.route('/itat_records') def display_itat(): itat = ITATHandler() records = itat.get_all_itat() itat.close() return render_template('display_itat.html', records=records) # ADD a new ITAT record # @app.route('/itat/add', methods=['GET', 'POST']) # def add_itat(): # cit = CITHandler() # cit_records = cit.get_all_cit() # cit.close() # if request.method == 'POST': # data = { # "cit_id": request.form.get("cit_id"), # "year": request.form.get("year"), # "mat_tax_credit": request.form.get("mat_tax_credit"), # "surcharge": request.form.get("surcharge"), # "cess": request.form.get("cess"), # "total_credit": request.form.get("total_credit") # } # itat = ITATHandler() # itat.add_itat(data) # itat.close() # flash("ITAT Record Added Successfully!", "success") # return redirect(url_for('display_itat')) # return render_template('add_itat.html', cit_records=cit_records) @app.route('/itat/delete/', methods=['POST']) def delete_itat(id): itat = ITATHandler() itat.delete_itat_by_id(id) itat.close() flash("ITAT Record Deleted!", "success") return redirect(url_for('display_itat')) @app.route('/itat/update/', methods=['GET', 'POST']) def update_itat(id): itat = ITATHandler() record = itat.get_itat_by_id(id) if not record: flash("Record Not Found!", "danger") return redirect(url_for('display_itat')) if request.method == 'POST': data = { "year": request.form.get("year"), "mat_tax_credit": request.form.get("mat_tax_credit"), "surcharge": request.form.get("surcharge"), "cess": request.form.get("cess"), "total_credit": request.form.get("total_credit") } itat.update_itat(id, data) itat.close() flash("ITAT Record Updated!", "success") return redirect(url_for('display_itat')) itat.close() return render_template('update_itat.html', record=record) # DISPLAY all ITAT records # @app.route('/itat_records') # def display_itat(): # conn = get_db_connection() # cursor = conn.cursor(dictionary=True) # # Querying the 'itat' table # cursor.execute("SELECT * FROM itat ORDER BY year DESC, id DESC") # records = cursor.fetchall() # cursor.close() # conn.close() # # Rendering the 'display_itat.html' template # return render_template('display_itat.html', records=records) # ADD a new ITAT record # @app.route('/itat/add', methods=['GET', 'POST']) # def add_itat(): # conn = get_db_connection() # cursor = conn.cursor(dictionary=True) # # Fetch all CIT records to choose from # cursor.execute("SELECT id, year FROM cit ORDER BY year DESC") # cit_records = cursor.fetchall() # if request.method == 'POST': # cit_id = request.form.get('cit_id') # selected parent CIT id # columns = ['id', 'year','mat_tax_credit', 'surcharge', 'cess', 'total_credit'] # values = [cit_id, # request.form.get('year', 0), # request.form.get('mat_tax_credit', 0), # request.form.get('surcharge', 0), # request.form.get('cess', 0), # request.form.get('total_credit', 0)] # query = f"INSERT INTO itat ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(columns))})" # cursor.execute(query, tuple(values)) # conn.commit() # cursor.close() # conn.close() # flash("ITAT record added successfully!", "success") # return redirect(url_for('display_itat')) # cursor.close() # conn.close() # return render_template('add_itat.html', cit_records=cit_records) @app.route('/itat/add', methods=['GET', 'POST']) def add_itat(): itat = ITATHandler() if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} itat.add_itat(data) itat.close() flash("ITAT record added successfully!", "success") return redirect(url_for('display_itat')) itat.close() return render_template('add_itat.html') # @app.route('/itat/update/', methods=['GET', 'POST']) # def update_itat(id): # conn = get_db_connection() # cursor = conn.cursor(dictionary=True) # # Fetch the existing record # cursor.execute("SELECT * FROM itat WHERE id=%s", (id,)) # record = cursor.fetchone() # if not record: # cursor.close() # conn.close() # flash("ITAT record not found!", "danger") # return redirect(url_for('display_itat')) # if request.method == 'POST': # columns = ['year', 'mat_tax_credit', 'surcharge', 'cess', 'total_credit'] # values = [request.form.get(col, 0) for col in columns] # set_clause = ", ".join([f"{col}=%s" for col in columns]) # query = f"UPDATE itat SET {set_clause} WHERE id=%s" # cursor.execute(query, tuple(values) + (id,)) # conn.commit() # cursor.close() # conn.close() # flash("ITAT record updated successfully!", "success") # return redirect(url_for('display_itat')) # cursor.close() # conn.close() # # Render a template with existing values filled in # return render_template('update_itat.html', record=record) # @app.route('/itat/delete/', methods=['POST']) # def delete_itat(id): # try: # conn = get_db_connection() # cursor = conn.cursor() # cursor.execute("DELETE FROM itat WHERE id=%s", (id,)) # conn.commit() # flash("ITAT record deleted successfully!", "success") # except Exception as err: # flash(f"Error deleting ITAT: {err}", "danger") # finally: # cursor.close() # conn.close() # return redirect(url_for('display_itat')) # (You will also need to add update_itat and delete_itat functions later) @app.route('/cit', methods=['GET', 'POST']) def cit_form(): if request.method == 'POST': data = {key: request.form.get(key, 0) for key in request.form} conn = mysql.connector.connect(**db_config) cursor = conn.cursor() query = """ INSERT INTO cit ( year, gross_total_income, deduction_80ia_business, deduction_sec37_disallowance, deduction_80g, net_taxable_income, tax_30_percent, tax_book_profit_18_5, tax_payable, surcharge_12, edu_cess_3, total_tax_payable, mat_credit, interest_234c, total_tax, advance_tax, tds, tcs, tax_on_assessment, refund ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( data.get('year'), # Include 'year' as the first value float(data.get('gross_total_income', 0)), float(data.get('deduction_80ia_business', 0)), float(data.get('deduction_sec37_disallowance', 0)), float(data.get('deduction_80g', 0)), float(data.get('net_taxable_income', 0)), float(data.get('tax_30_percent', 0)), float(data.get('tax_book_profit_18_5', 0)), float(data.get('tax_payable', 0)), float(data.get('surcharge_12', 0)), float(data.get('edu_cess_3', 0)), float(data.get('total_tax_payable', 0)), float(data.get('mat_credit', 0)), float(data.get('interest_234c', 0)), float(data.get('total_tax', 0)), float(data.get('advance_tax', 0)), float(data.get('tds', 0)), float(data.get('tcs', 0)), float(data.get('tax_on_assessment', 0)), float(data.get('refund', 0)) ) cursor.execute(query, values) conn.commit() cursor.close() conn.close() return redirect(url_for('index')) return render_template('cit_form.html') @app.route('/itat', methods=['GET', 'POST']) def itat_form(): if request.method == 'POST': mat_tax_credit = request.form['mat_tax_credit'] surcharge = request.form['surcharge'] cess = request.form['cess'] total_credit = request.form['total_credit'] year=request.form['year'] conn = mysql.connector.connect(**db_config) cursor = conn.cursor() cursor.execute(""" INSERT INTO itat (year, mat_tax_credit, surcharge, cess, total_credit) VALUES (%s,%s, %s, %s, %s) """, (year,mat_tax_credit, surcharge, cess, total_credit)) conn.commit() cursor.close() conn.close() return redirect(url_for('index')) return render_template('itat_form.html') def get_db_connection(): connection = mysql.connector.connect(**db_config) return connection @app.route('/reports') def reports(): return render_template("reports.html") # Itr report download by year @app.route('/itr_report', methods=['GET']) def itr_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: itr = ITRHandler() output = itr.itr_report_download(selected_year) itr.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITR_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetITRYears") yearGetter.close() return render_template("itr_reports.html", years=years) # Ao report download by year @app.route('/ao_report', methods=['GET']) def ao_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: ao = AOHandler() output = ao.ao_report_download(selected_year) ao.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name=f"AO_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetAOYears") yearGetter.close() return render_template("ao_reports.html", years=years) # Cit report download by year @app.route('/cit_report', methods=['GET']) def cit_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: cit = CITHandler() output = cit.cit_report_download(selected_year) cit.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"CIT_Report_{selected_year}_Vertical.xlsx" ) else: years = yearGetter.get_year_by_model("GetCITYears") yearGetter.close() return render_template("cit_reports.html", years=years) # Itat report download by year @app.route('/itat_report', methods=['GET']) def itat_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: itat = ITATHandler() output = itat.itat_report_download(selected_year) itat.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx" ) else: # Use stored procedure for years years = yearGetter.get_year_by_model("GetITATYears") yearGetter.close() return render_template("itat_reports.html", years=years) # @app.route('/itr/reports', methods=['GET', 'POST']) # def itr_reports(): # yearGetter = YearGet() # if request.method == "POST": # selected_year = request.form.get("year") # itr=ITRHandler() # yearGetter.close() # return redirect(url_for("itr_report_result", year=selected_year)) # # GET method → fetch all distinct years through procedure # years = yearGetter.get_year_by_model("GetITRYears") # yearGetter.close() # print("---- year --",years) # return render_template("itr_reports.html", years=years) # @app.route('/ao_report', methods=['GET']) # def ao_report(): # selected_year = request.args.get('year') # connection = pymysql.connect(**db_config) # try: # if selected_year: # query = "SELECT * FROM ao WHERE year = %s" # df = pd.read_sql(query, connection, params=[selected_year]) # if df.empty: # return "No records found for the selected year." # # Transpose the DataFrame: rows → fields, columns → records # df_transposed = df.transpose() # df_transposed.insert(0, 'Field', df_transposed.index) # # Rename columns to "Record 1", "Record 2", ... # for i in range(1, df_transposed.shape[1]): # df_transposed.rename(columns={df_transposed.columns[i]: f"Record {i}"}, inplace=True) # df_transposed.reset_index(drop=True, inplace=True) # output = io.BytesIO() # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: # df_transposed.to_excel(writer, index=False, sheet_name='AO_Vertical') # # Optional: Adjust formatting # workbook = writer.book # worksheet = writer.sheets['AO_Vertical'] # worksheet.set_column(0, 0, 30) # Widen 'Field' column # output.seek(0) # return send_file( # output, # mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # as_attachment=True, # download_name=f"AO_Report_{selected_year}.xlsx" # ) # else: # with connection.cursor() as cursor: # cursor.execute("SELECT DISTINCT year FROM ao ORDER BY year DESC") # years = [row[0] for row in cursor.fetchall()] # return render_template("ao_reports.html", years=years) # finally: # connection.close() # @app.route('/cit_report', methods=['GET']) # def cit_report(): # selected_year = request.args.get('year') # connection = pymysql.connect(**db_config) # try: # if selected_year: # # Fetch data from the `cit` table for the selected year # query = "SELECT * FROM cit WHERE year = %s" # df = pd.read_sql(query, connection, params=[selected_year]) # output = io.BytesIO() # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: # workbook = writer.book # # Write each row vertically on a separate sheet or below one another # for i, (_, row) in enumerate(df.iterrows(), start=1): # # Convert the row to vertical format # vertical_df = pd.DataFrame(row).reset_index() # vertical_df.columns = ['Field', 'Value'] # # Write each vertical entry below the previous (e.g., block by block) # start_row = (i - 1) * (len(vertical_df) + 3) # 3-row gap between entries # vertical_df.to_excel(writer, sheet_name='CIT_Report', index=False, startrow=start_row) # output.seek(0) # return send_file( # output, # mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # as_attachment=True, # download_name=f"CIT_Report_{selected_year}_Vertical.xlsx" # ) # else: # # Render dropdown for year selection # with connection.cursor() as cursor: # cursor.execute("SELECT DISTINCT year FROM cit ORDER BY year DESC") # years = [row[0] for row in cursor.fetchall()] # return render_template("cit_reports.html", years=years) # finally: # connection.close() # @app.route('/itat_report', methods=['GET']) # def itat_report(): # selected_year = request.args.get('year') # connection = pymysql.connect(**db_config) # try: # if selected_year: # query = "SELECT * FROM itat WHERE year = %s" # df = pd.read_sql(query, connection, params=[selected_year]) # output = io.BytesIO() # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: # df.T.to_excel(writer, header=False, sheet_name='ITAT_Report') # output.seek(0) # return send_file( # output, # mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # as_attachment=True, # download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx" # ) # else: # with connection.cursor() as cursor: # cursor.execute("SELECT DISTINCT year FROM itat ORDER BY year DESC") # years = [row[0] for row in cursor.fetchall()] # return render_template("itat_reports.html", years=years) # finally: # connection.close() # @app.route('/itr_report_download', methods=['GET']) # def itr_report_download(): # connection = pymysql.connect(**db_config) # try: # selected_year = request.args.get('year') # if selected_year: # query = "SELECT * FROM itr WHERE year = %s" # df = pd.read_sql(query, connection, params=[selected_year]) # output = io.BytesIO() # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: # df.to_excel(writer, index=False, sheet_name=f"ITR {selected_year}") # output.seek(0) # return send_file( # output, # download_name=f"ITR_Report_{selected_year}.xlsx", # as_attachment=True, # mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' # ) # else: # # If no year is selected, show dropdown # with connection.cursor() as cursor: # cursor.execute("SELECT DISTINCT year FROM itr ORDER BY year DESC") # years = [row[0] for row in cursor.fetchall()] # return render_template('itr_reports.html', years=years) # finally: # connection.close() @app.route('/download/') def download_report(doc_id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM documents WHERE id = %s", (doc_id,)) document = cursor.fetchone() conn.close() if not document: return "Document not found", 404 file_path = os.path.join('static', 'uploads', document['filename']) # adjust as per your storage return send_from_directory(directory='static/uploads', path=document['filename'], as_attachment=True) @app.route('/summary_report', methods=['GET']) def summary_report(): year = request.args.get('year') if not year: connection = pymysql.connect(**db_config) try: years = set() for table in ['itr', 'ao', 'cit', 'itat']: df = pd.read_sql(f"SELECT DISTINCT year FROM {table}", connection) years.update(int(y) for y in df['year'].dropna().tolist()) return render_template('summary_reports.html', years=sorted(years), message="Please select a year to download.") finally: connection.close() connection = pymysql.connect(**db_config) try: stages = ['itr', 'ao', 'cit', 'itat'] stage_data = {} for stage in stages: query = f"SELECT * FROM {stage} WHERE year = %s" df = pd.read_sql(query, connection, params=[year]) stage_data[stage.upper()] = df def safe_get(df, col): return df[col].values[0] if col in df.columns and not df.empty else '-' particulars = [ "Gross Total Income", "Add: Disallowance u/s 14A", "Add: Disallowance u/s 37", "GTI as per", "Less: Deduction u/s 80IA", "Less: Deduction u/s 80G", "Net Taxable Income", "Tax @ 30%", "Tax @ 18.5% on Book Profit", "Surcharge @ 12%", "Education Cess @ 3%", "Total Tax Payable", "Less: MAT Credit", "Net Tax", "Add: Interest u/s 234C", "Total Tax", "Advance Tax", "TDS", "TCS", "SAT", "Tax on Regular Assessment", "Refund" ] columns = [ 'gross_total_income', 'disallowance_14a', 'disallowance_37', 'gti', 'deduction_80ia', 'deduction_80g', 'net_taxable_income', 'tax_30', 'book_profit_tax', 'surcharge_12', 'education_cess', 'total_tax', 'mat_credit', 'net_tax', 'interest_234c', 'total_tax_payable', 'advance_tax', 'tds', 'tcs', 'sat', 'tax_regular', 'refund' ] data = { "Particulars": particulars, "ITR": [safe_get(stage_data['ITR'], col) for col in columns], "AO": [safe_get(stage_data['AO'], col) for col in columns], "CIT(A)": [safe_get(stage_data['CIT'], col) for col in columns], "ITAT": [safe_get(stage_data['ITAT'], col) for col in columns], } df = pd.DataFrame(data) # Export to Excel with formatting output = io.BytesIO() with pd.ExcelWriter(output, engine='xlsxwriter') as writer: df.to_excel(writer, index=False, sheet_name=f'AY {year}') workbook = writer.book worksheet = writer.sheets[f'AY {year}'] # Format definitions header_format = workbook.add_format({ 'bold': True, 'text_wrap': True, 'valign': 'middle', 'align': 'center', 'bg_color': '#007bff', 'font_color': 'white', 'border': 1 }) cell_format = workbook.add_format({ 'border': 1, 'valign': 'top', 'align': 'center', }) # Apply formats for col_num, value in enumerate(df.columns): worksheet.write(0, col_num, value, header_format) # Auto column width max_len = max(df[value].astype(str).map(len).max(), len(str(value))) + 2 worksheet.set_column(col_num, col_num, max_len) # Format data rows for row_num in range(1, len(df) + 1): for col_num in range(len(df.columns)): worksheet.write(row_num, col_num, df.iloc[row_num - 1, col_num], cell_format) output.seek(0) return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"Summary_Report_{year}.xlsx" ) finally: connection.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=5003, debug=True)