Files
IncomeTaxSystem/main.py

1046 lines
36 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, send_from_directory, abort, flash,send_file
import os
import pandas as pd
import pymysql
import io
import mysql.connector
from werkzeug.utils import secure_filename
from AppCode.FileHandler import FileHandler
from AppCode.DocumentHandler import DocumentHandler
from config import db_config
from AppCode.Config import DBConfig
from AppCode.ITRHandler import ITRHandler
from AppCode.AOHandler import AOHandler
from AppCode.CITHandler import CITHandler
from AppCode.ITATHandler import ITATHandler
from AppCode.YearGet import YearGet
app = Flask(__name__)
app.secret_key="secret1234"
app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER
# welcome page
@app.route('/')
def welcome():
return render_template('welcome.html')
# Dashboard page
@app.route('/dashboard')
def index():
return render_template('index.html')
# Ensure folder exists
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in FileHandler.ALLOWED_EXTENSIONS
# Upload File route
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
FileHandler.CHeckExistingOrCreateNewUploadFolder()
docHandler = DocumentHandler()
docHandler.Upload(request=request)
return redirect(url_for('view_documents'))
return render_template('upload.html')
# View all documents with filters
@app.route('/documents')
def view_documents():
docHandler = DocumentHandler()
docHandler.View(request=request)
return render_template('view_docs.html', documents=docHandler.documents, years=docHandler.years)
# Upload file documents
@app.route('/uploads/<filename>')
def uploaded_file(filename):
mode = request.args.get('mode', 'view')
filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename))
if not os.path.exists(filepath):
abort(404)
file_ext = filename.rsplit('.', 1)[-1].lower()
# --- View Mode ---
if mode == 'view':
if file_ext == 'pdf':
return send_file(filepath, mimetype='application/pdf')
elif file_ext in ['xls', 'xlsx']:
# Excel cannot be rendered in-browser by Flask; trigger download instead
return send_file(filepath, as_attachment=False, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
else:
return abort(415) # Unsupported type for viewing
return send_file(filepath, as_attachment=True)
# @app.route('/itr', methods=['GET', 'POST'])
# def itr_form():
# if request.method == 'POST':
# data = {key: request.form.get(key, 0) for key in request.form}
# conn = mysql.connector.connect(**db_config)
# cursor = conn.cursor()
# query = """
# INSERT INTO itr (
# year, gross_total_income, disallowance_14a, disallowance_37,
# deduction_80ia_business, deduction_80ia_misc, deduction_80ia_other,
# deduction_sec37_disallowance, deduction_80g, net_taxable_income,
# tax_30_percent, tax_book_profit_18_5, tax_payable, surcharge_12,
# edu_cess_3, total_tax_payable, mat_credit, interest_234c,
# total_tax, advance_tax, tds, tcs, tax_on_assessment, refund
# ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
# """
# values = tuple([
# int(data.get('year', 0))
# ] + [
# float(data.get(col, 0)) for col in [
# 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
# ]
# ])
# cursor.execute(query, values)
# conn.commit()
# flash("ITR record deleted successfully!", "success")
# cursor.close()
# conn.close()
# return redirect(url_for('index'))
# return render_template('itr_form.html')
## ===============================================
## ITR (Income Tax Return) Routes
## ===============================================
## 1. READ/DISPLAY all ITR records
@app.route('/itr_records')
def display_itr():
itr = ITRHandler()
records = itr.get_all_itr()
itr.close()
return render_template('display_itr.html', records=records)
## 2. CREATE/ADD a new ITR record
@app.route('/itr/add', methods=['GET', 'POST'])
def add_itr():
if request.method == 'POST':
itr = ITRHandler()
itr.add_itr(request.form)
itr.close()
flash("ITR record added successfully!", "success")
return redirect(url_for('display_itr'))
return render_template('add_itr.html')
## 4. DELETE an ITR record
@app.route('/itr/delete/<int:id>', methods=['POST'])
def delete_itr(id):
itr = ITRHandler()
itr.delete_itr_by_id(id=id)
itr.close()
return redirect(url_for('display_itr'))
## 3. UPDATE an existing ITR record
@app.route('/itr/update/<int:id>', methods=['GET', 'POST'])
def update_itr(id):
itr = ITRHandler()
if request.method == 'POST':
data = {k: request.form.get(k, 0) for k in request.form}
print("itr data-->",data)
itr.update(id, data=data)
itr.close()
return redirect(url_for('display_itr'))
record = itr.get_itr_by_id(id)
itr.close()
return render_template('update_itr.html', record=record)
## 3. UPDATE an existing ITR record
# @app.route('/itr/update/<int:id>', methods=['GET', 'POST'])
# def update_itr(id):
# conn = get_db_connection()
# if request.method == 'POST':
# cursor = conn.cursor()
# columns = [
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
# ]
# # Create the "SET column = %s" part of the query
# set_clause = ', '.join([f"{col} = %s" for col in columns])
# query = f"UPDATE itr SET {set_clause} WHERE id = %s"
# values = [request.form.get(col, 0) for col in columns]
# values.append(id) # Add the ID for the WHERE clause at the end
# cursor.execute(query, tuple(values))
# conn.commit()
# cursor.close()
# conn.close()
# return redirect(url_for('display_itr'))
# # For a GET request, fetch the existing data and show it in the form
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM itr WHERE id = %s", (id,))
# record = cursor.fetchone()
# cursor.close()
# conn.close()
# return render_template('update_itr.html', record=record)
## ===============================================
## AO (Assessing Officer) Routes
## ===============================================
# 1. DISPLAY all AO records
@app.route('/ao_records')
def display_ao():
ao = AOHandler()
ao_records = ao.get_all_ao()
ao.close()
return render_template('display_ao.html', ao_records=ao_records)
# 2. ADD a new AO record
@app.route('/ao/add', methods=['GET', 'POST'])
def add_ao():
if request.method == 'POST':
ao = AOHandler()
ao.add_ao(request.form)
ao.close()
flash("AO record added successfully!", "success")
return redirect(url_for('display_ao'))
return render_template('add_ao.html')
# 3. UPDATE AO record
@app.route('/ao/update/<int:id>', methods=['GET', 'POST'])
def update_ao(id):
ao = AOHandler()
record = ao.get_ao_by_id(id)
if not record:
return "AO record not found", 404
if request.method == 'POST':
data = request.form.to_dict()
ao.update_ao(id, data)
ao.close()
flash("AO record updated successfully!", "success")
return redirect(url_for('display_ao'))
ao.close()
return render_template("update_ao.html", record=record)
# 4. DELETE AO record safely
@app.route('/ao/delete/<int:id>', methods=['POST'])
def delete_ao(id):
ao = AOHandler()
ao.delete_ao_by_id(id=id)
ao.close()
flash("AO deleted successfully!", "success")
return redirect(url_for('display_ao'))
# 3. UPDATE AO record
# @app.route('/ao/update/<int:id>', methods=['GET', 'POST'])
# def update_ao(id):
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM ao WHERE id = %s", (id,))
# ao_record = cursor.fetchone()
# if not ao_record:
# cursor.close()
# conn.close()
# return "AO record not found", 404
# if request.method == 'POST':
# columns = [
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_sec37_disallowance', 'deduction_80g',
# 'net_taxable_income', 'tax_30_percent', 'tax_book_profit_18_5',
# 'surcharge_12', 'edu_cess_3', 'total_tax_payable', 'mat_credit',
# 'interest_234c', 'total_tax', 'advance_tax', 'tds', 'tcs',
# 'tax_on_assessment', 'refund'
# ]
# values = [request.form.get(col, 0) for col in columns]
# set_clause = ", ".join([f"{col}=%s" for col in columns])
# query = f"UPDATE ao SET {set_clause} WHERE id=%s"
# cursor.execute(query, tuple(values) + (id,))
# conn.commit()
# cursor.close()
# conn.close()
# flash("AO record updated successfully!", "success")
# return redirect(url_for('display_ao'))
# cursor.close()
# conn.close()
# return render_template('update_ao.html', record=ao_record)
## =======================================================
## CIT (Commissioner of Income Tax) Routes
## =======================================================
# DISPLAY all CIT records
@app.route('/cit_records')
def display_cit():
cit = CITHandler()
cit_records = cit.get_all_cit()
cit.close()
return render_template('display_cit.html', cit_records=cit_records)
@app.route('/cit/add', methods=['GET', 'POST'])
def add_cit():
if request.method == 'POST':
cit = CITHandler()
cit.add_cit(request.form)
cit.close()
flash("CIT record added successfully!", "success")
return redirect(url_for('display_cit'))
return render_template('add_cit.html')
@app.route('/cit/delete/<int:id>', methods=['POST'])
def delete_cit(id):
cit = CITHandler()
cit.delete_cit(id)
cit.close()
flash("CIT record deleted successfully!", "success")
return redirect(url_for('display_cit'))
@app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
def update_cit(id):
cit = CITHandler()
record = cit.get_cit_by_id(id)
if not record:
cit.close()
return "CIT record not found", 404
if request.method == 'POST':
data = {k: request.form.get(k, 0) for k in request.form}
cit.update_cit(id, data)
cit.close()
return redirect(url_for('display_cit'))
cit.close()
return render_template('update_cit.html', record=record)
# @app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
# def update_cit(id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM cit WHERE id=%s", (id,))
record = cursor.fetchone()
if not record:
cursor.close()
conn.close()
return "CIT record not found", 404
if request.method == 'POST':
columns = [
"year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
"deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
"tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
"interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
]
values = [request.form.get(col, 0) for col in columns]
set_clause = ", ".join([f"{col}=%s" for col in columns])
query = f"UPDATE cit SET {set_clause} WHERE id=%s"
cursor.execute(query, tuple(values)+(id,))
conn.commit()
cursor.close()
conn.close()
return redirect(url_for('display_cit'))
cursor.close()
conn.close()
return render_template('add_cit.html', record=record)
# DISPLAY all CIT records
# @app.route('/cit_records')
# def display_cit():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM cit ORDER BY year DESC, id DESC")
# cit_records = cursor.fetchall()
# cursor.close()
# conn.close()
# return render_template('display_cit.html', cit_records=cit_records)
# ADD a new CIT record
# @app.route('/cit/add', methods=['GET', 'POST'])
# def add_cit():
# if request.method == 'POST':
# conn = get_db_connection()
# cursor = conn.cursor()
# columns = [
# "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
# "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
# "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
# "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
# ]
# values = [request.form.get(col, 0) for col in columns]
# query = f"INSERT INTO cit ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
# cursor.execute(query, tuple(values))
# conn.commit()
# flash("ITAT record added successfully!", "success")
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
# return render_template('add_cit.html')
# @app.route('/cit/delete/<int:id>', methods=['POST'])
# def delete_cit(id):
# try:
# conn = get_db_connection()
# cursor = conn.cursor()
# cursor.execute("DELETE FROM cit WHERE id=%s", (id,))
# conn.commit()
# flash("ITR record deleted successfully!", "success")
# except Exception as err:
# print(f"Error deleting CIT record: {err}")
# finally:
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
## =======================================================
## ITAT (Income Tax Appellate Tribunal) Routes
## =======================================================
# DISPLAY all ITAT records
@app.route('/itat_records')
def display_itat():
itat = ITATHandler()
records = itat.get_all_itat()
itat.close()
return render_template('display_itat.html', records=records)
# ADD a new ITAT record
# @app.route('/itat/add', methods=['GET', 'POST'])
# def add_itat():
# cit = CITHandler()
# cit_records = cit.get_all_cit()
# cit.close()
# if request.method == 'POST':
# data = {
# "cit_id": request.form.get("cit_id"),
# "year": request.form.get("year"),
# "mat_tax_credit": request.form.get("mat_tax_credit"),
# "surcharge": request.form.get("surcharge"),
# "cess": request.form.get("cess"),
# "total_credit": request.form.get("total_credit")
# }
# itat = ITATHandler()
# itat.add_itat(data)
# itat.close()
# flash("ITAT Record Added Successfully!", "success")
# return redirect(url_for('display_itat'))
# return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/delete/<int:id>', methods=['POST'])
def delete_itat(id):
itat = ITATHandler()
itat.delete_itat_by_id(id)
itat.close()
flash("ITAT Record Deleted!", "success")
return redirect(url_for('display_itat'))
@app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
def update_itat(id):
itat = ITATHandler()
record = itat.get_itat_by_id(id)
if not record:
flash("Record Not Found!", "danger")
return redirect(url_for('display_itat'))
if request.method == 'POST':
data = {
"year": request.form.get("year"),
"mat_tax_credit": request.form.get("mat_tax_credit"),
"surcharge": request.form.get("surcharge"),
"cess": request.form.get("cess"),
"total_credit": request.form.get("total_credit")
}
itat.update_itat(id, data)
itat.close()
flash("ITAT Record Updated!", "success")
return redirect(url_for('display_itat'))
itat.close()
return render_template('update_itat.html', record=record)
# DISPLAY all ITAT records
# @app.route('/itat_records')
# def display_itat():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Querying the 'itat' table
# cursor.execute("SELECT * FROM itat ORDER BY year DESC, id DESC")
# records = cursor.fetchall()
# cursor.close()
# conn.close()
# # Rendering the 'display_itat.html' template
# return render_template('display_itat.html', records=records)
# ADD a new ITAT record
# @app.route('/itat/add', methods=['GET', 'POST'])
# def add_itat():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Fetch all CIT records to choose from
# cursor.execute("SELECT id, year FROM cit ORDER BY year DESC")
# cit_records = cursor.fetchall()
# if request.method == 'POST':
# cit_id = request.form.get('cit_id') # selected parent CIT id
# columns = ['id', 'year','mat_tax_credit', 'surcharge', 'cess', 'total_credit']
# values = [cit_id,
# request.form.get('year', 0),
# request.form.get('mat_tax_credit', 0),
# request.form.get('surcharge', 0),
# request.form.get('cess', 0),
# request.form.get('total_credit', 0)]
# query = f"INSERT INTO itat ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(columns))})"
# cursor.execute(query, tuple(values))
# conn.commit()
# cursor.close()
# conn.close()
# flash("ITAT record added successfully!", "success")
# return redirect(url_for('display_itat'))
# cursor.close()
# conn.close()
# return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/add', methods=['GET', 'POST'])
def add_itat():
itat = ITATHandler()
if request.method == 'POST':
data = {k: request.form.get(k, 0) for k in request.form}
itat.add_itat(data)
itat.close()
flash("ITAT record added successfully!", "success")
return redirect(url_for('display_itat'))
itat.close()
return render_template('add_itat.html')
# @app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
# def update_itat(id):
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Fetch the existing record
# cursor.execute("SELECT * FROM itat WHERE id=%s", (id,))
# record = cursor.fetchone()
# if not record:
# cursor.close()
# conn.close()
# flash("ITAT record not found!", "danger")
# return redirect(url_for('display_itat'))
# if request.method == 'POST':
# columns = ['year', 'mat_tax_credit', 'surcharge', 'cess', 'total_credit']
# values = [request.form.get(col, 0) for col in columns]
# set_clause = ", ".join([f"{col}=%s" for col in columns])
# query = f"UPDATE itat SET {set_clause} WHERE id=%s"
# cursor.execute(query, tuple(values) + (id,))
# conn.commit()
# cursor.close()
# conn.close()
# flash("ITAT record updated successfully!", "success")
# return redirect(url_for('display_itat'))
# cursor.close()
# conn.close()
# # Render a template with existing values filled in
# return render_template('update_itat.html', record=record)
# @app.route('/itat/delete/<int:id>', methods=['POST'])
# def delete_itat(id):
# try:
# conn = get_db_connection()
# cursor = conn.cursor()
# cursor.execute("DELETE FROM itat WHERE id=%s", (id,))
# conn.commit()
# flash("ITAT record deleted successfully!", "success")
# except Exception as err:
# flash(f"Error deleting ITAT: {err}", "danger")
# finally:
# cursor.close()
# conn.close()
# return redirect(url_for('display_itat'))
# (You will also need to add update_itat and delete_itat functions later)
@app.route('/cit', methods=['GET', 'POST'])
def cit_form():
if request.method == 'POST':
data = {key: request.form.get(key, 0) for key in request.form}
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
query = """
INSERT INTO cit (
year, gross_total_income, deduction_80ia_business, deduction_sec37_disallowance,
deduction_80g, net_taxable_income, tax_30_percent, tax_book_profit_18_5,
tax_payable, surcharge_12, edu_cess_3, total_tax_payable, mat_credit,
interest_234c, total_tax, advance_tax, tds, tcs, tax_on_assessment, refund
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
data.get('year'), # Include 'year' as the first value
float(data.get('gross_total_income', 0)),
float(data.get('deduction_80ia_business', 0)),
float(data.get('deduction_sec37_disallowance', 0)),
float(data.get('deduction_80g', 0)),
float(data.get('net_taxable_income', 0)),
float(data.get('tax_30_percent', 0)),
float(data.get('tax_book_profit_18_5', 0)),
float(data.get('tax_payable', 0)),
float(data.get('surcharge_12', 0)),
float(data.get('edu_cess_3', 0)),
float(data.get('total_tax_payable', 0)),
float(data.get('mat_credit', 0)),
float(data.get('interest_234c', 0)),
float(data.get('total_tax', 0)),
float(data.get('advance_tax', 0)),
float(data.get('tds', 0)),
float(data.get('tcs', 0)),
float(data.get('tax_on_assessment', 0)),
float(data.get('refund', 0))
)
cursor.execute(query, values)
conn.commit()
cursor.close()
conn.close()
return redirect(url_for('index'))
return render_template('cit_form.html')
@app.route('/itat', methods=['GET', 'POST'])
def itat_form():
if request.method == 'POST':
mat_tax_credit = request.form['mat_tax_credit']
surcharge = request.form['surcharge']
cess = request.form['cess']
total_credit = request.form['total_credit']
year=request.form['year']
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO itat (year, mat_tax_credit, surcharge, cess, total_credit)
VALUES (%s,%s, %s, %s, %s)
""", (year,mat_tax_credit, surcharge, cess, total_credit))
conn.commit()
cursor.close()
conn.close()
return redirect(url_for('index'))
return render_template('itat_form.html')
def get_db_connection():
connection = mysql.connector.connect(**db_config)
return connection
@app.route('/reports')
def reports():
return render_template("reports.html")
# new new
@app.route('/itr_report', methods=['GET'])
def itr_report():
yearGetter = YearGet()
selected_year = request.args.get('year')
if selected_year:
itr = ITRHandler()
output = itr.itr_report_download(selected_year)
itr.close()
if output is None:
return "No records found for the selected year."
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITR_Report_{selected_year}.xlsx"
)
else:
years = yearGetter.get_year_by_model("GetITRYears")
yearGetter.close()
return render_template("itr_reports.html", years=years)
# @app.route('/itr/reports', methods=['GET', 'POST'])
# def itr_reports():
# yearGetter = YearGet()
# if request.method == "POST":
# selected_year = request.form.get("year")
# itr=ITRHandler()
# yearGetter.close()
# return redirect(url_for("itr_report_result", year=selected_year))
# # GET method → fetch all distinct years through procedure
# years = yearGetter.get_year_by_model("GetITRYears")
# yearGetter.close()
# print("---- year --",years)
# return render_template("itr_reports.html", years=years)
@app.route('/ao_report', methods=['GET'])
def ao_report():
selected_year = request.args.get('year')
connection = pymysql.connect(**db_config)
try:
if selected_year:
query = "SELECT * FROM ao WHERE year = %s"
df = pd.read_sql(query, connection, params=[selected_year])
if df.empty:
return "No records found for the selected year."
# Transpose the DataFrame: rows → fields, columns → records
df_transposed = df.transpose()
df_transposed.insert(0, 'Field', df_transposed.index)
# Rename columns to "Record 1", "Record 2", ...
for i in range(1, df_transposed.shape[1]):
df_transposed.rename(columns={df_transposed.columns[i]: f"Record {i}"}, inplace=True)
df_transposed.reset_index(drop=True, inplace=True)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df_transposed.to_excel(writer, index=False, sheet_name='AO_Vertical')
# Optional: Adjust formatting
workbook = writer.book
worksheet = writer.sheets['AO_Vertical']
worksheet.set_column(0, 0, 30) # Widen 'Field' column
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"AO_Report_{selected_year}.xlsx"
)
else:
with connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT year FROM ao ORDER BY year DESC")
years = [row[0] for row in cursor.fetchall()]
return render_template("ao_reports.html", years=years)
finally:
connection.close()
@app.route('/cit_report', methods=['GET'])
def cit_report():
selected_year = request.args.get('year')
connection = pymysql.connect(**db_config)
try:
if selected_year:
# Fetch data from the `cit` table for the selected year
query = "SELECT * FROM cit WHERE year = %s"
df = pd.read_sql(query, connection, params=[selected_year])
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
workbook = writer.book
# Write each row vertically on a separate sheet or below one another
for i, (_, row) in enumerate(df.iterrows(), start=1):
# Convert the row to vertical format
vertical_df = pd.DataFrame(row).reset_index()
vertical_df.columns = ['Field', 'Value']
# Write each vertical entry below the previous (e.g., block by block)
start_row = (i - 1) * (len(vertical_df) + 3) # 3-row gap between entries
vertical_df.to_excel(writer, sheet_name='CIT_Report', index=False, startrow=start_row)
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"CIT_Report_{selected_year}_Vertical.xlsx"
)
else:
# Render dropdown for year selection
with connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT year FROM cit ORDER BY year DESC")
years = [row[0] for row in cursor.fetchall()]
return render_template("cit_reports.html", years=years)
finally:
connection.close()
@app.route('/itat_report', methods=['GET'])
def itat_report():
selected_year = request.args.get('year')
connection = pymysql.connect(**db_config)
try:
if selected_year:
query = "SELECT * FROM itat WHERE year = %s"
df = pd.read_sql(query, connection, params=[selected_year])
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df.T.to_excel(writer, header=False, sheet_name='ITAT_Report')
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx"
)
else:
with connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT year FROM itat ORDER BY year DESC")
years = [row[0] for row in cursor.fetchall()]
return render_template("itat_reports.html", years=years)
finally:
connection.close()
# @app.route('/itr_report_download', methods=['GET'])
# def itr_report_download():
# connection = pymysql.connect(**db_config)
# try:
# selected_year = request.args.get('year')
# if selected_year:
# query = "SELECT * FROM itr WHERE year = %s"
# df = pd.read_sql(query, connection, params=[selected_year])
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df.to_excel(writer, index=False, sheet_name=f"ITR {selected_year}")
# output.seek(0)
# return send_file(
# output,
# download_name=f"ITR_Report_{selected_year}.xlsx",
# as_attachment=True,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
# )
# else:
# # If no year is selected, show dropdown
# with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM itr ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
# return render_template('itr_reports.html', years=years)
# finally:
# connection.close()
@app.route('/download/<int:doc_id>')
def download_report(doc_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM documents WHERE id = %s", (doc_id,))
document = cursor.fetchone()
conn.close()
if not document:
return "Document not found", 404
file_path = os.path.join('static', 'uploads', document['filename']) # adjust as per your storage
return send_from_directory(directory='static/uploads', path=document['filename'], as_attachment=True)
@app.route('/summary_report', methods=['GET'])
def summary_report():
year = request.args.get('year')
if not year:
connection = pymysql.connect(**db_config)
try:
years = set()
for table in ['itr', 'ao', 'cit', 'itat']:
df = pd.read_sql(f"SELECT DISTINCT year FROM {table}", connection)
years.update(int(y) for y in df['year'].dropna().tolist())
return render_template('summary_reports.html', years=sorted(years), message="Please select a year to download.")
finally:
connection.close()
connection = pymysql.connect(**db_config)
try:
stages = ['itr', 'ao', 'cit', 'itat']
stage_data = {}
for stage in stages:
query = f"SELECT * FROM {stage} WHERE year = %s"
df = pd.read_sql(query, connection, params=[year])
stage_data[stage.upper()] = df
def safe_get(df, col):
return df[col].values[0] if col in df.columns and not df.empty else '-'
particulars = [
"Gross Total Income", "Add: Disallowance u/s 14A", "Add: Disallowance u/s 37", "GTI as per",
"Less: Deduction u/s 80IA", "Less: Deduction u/s 80G", "Net Taxable Income", "Tax @ 30%",
"Tax @ 18.5% on Book Profit", "Surcharge @ 12%", "Education Cess @ 3%", "Total Tax Payable",
"Less: MAT Credit", "Net Tax", "Add: Interest u/s 234C", "Total Tax",
"Advance Tax", "TDS", "TCS", "SAT", "Tax on Regular Assessment", "Refund"
]
columns = [
'gross_total_income', 'disallowance_14a', 'disallowance_37', 'gti',
'deduction_80ia', 'deduction_80g', 'net_taxable_income', 'tax_30',
'book_profit_tax', 'surcharge_12', 'education_cess', 'total_tax',
'mat_credit', 'net_tax', 'interest_234c', 'total_tax_payable',
'advance_tax', 'tds', 'tcs', 'sat', 'tax_regular', 'refund'
]
data = {
"Particulars": particulars,
"ITR": [safe_get(stage_data['ITR'], col) for col in columns],
"AO": [safe_get(stage_data['AO'], col) for col in columns],
"CIT(A)": [safe_get(stage_data['CIT'], col) for col in columns],
"ITAT": [safe_get(stage_data['ITAT'], col) for col in columns],
}
df = pd.DataFrame(data)
# Export to Excel with formatting
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df.to_excel(writer, index=False, sheet_name=f'AY {year}')
workbook = writer.book
worksheet = writer.sheets[f'AY {year}']
# Format definitions
header_format = workbook.add_format({
'bold': True,
'text_wrap': True,
'valign': 'middle',
'align': 'center',
'bg_color': '#007bff',
'font_color': 'white',
'border': 1
})
cell_format = workbook.add_format({
'border': 1,
'valign': 'top',
'align': 'center',
})
# Apply formats
for col_num, value in enumerate(df.columns):
worksheet.write(0, col_num, value, header_format)
# Auto column width
max_len = max(df[value].astype(str).map(len).max(), len(str(value))) + 2
worksheet.set_column(col_num, col_num, max_len)
# Format data rows
for row_num in range(1, len(df) + 1):
for col_num in range(len(df.columns)):
worksheet.write(row_num, col_num, df.iloc[row_num - 1, col_num], cell_format)
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"Summary_Report_{year}.xlsx"
)
finally:
connection.close()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5003, debug=True)