Files
IncomeTaxSystem/main.py

597 lines
17 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, send_from_directory, abort, flash,send_file ,jsonify
import os
from dotenv import load_dotenv
load_dotenv()
import pandas as pd
from werkzeug.utils import secure_filename
from datetime import date
from AppCode.Config import DBConfig
from AppCode.LoginAuth import LoginAuth
from AppCode.FileHandler import FileHandler
from AppCode.YearGet import YearGet
from AppCode.DocumentHandler import DocumentHandler
from AppCode.ITRHandler import ITRHandler
from AppCode.AOHandler import AOHandler
from AppCode.CITHandler import CITHandler
from AppCode.ITATHandler import ITATHandler
from AppCode.MatCreditHandler import MatCreditHandler
import subprocess
# Server
app = Flask(__name__)
app.secret_key=os.getenv("SECRET_KEY")
auth = LoginAuth()
app.register_blueprint(auth.bp)
# welcome page
@app.route('/')
@auth.login_required
def welcome():
return render_template('index.html')
# Dashboard page
@app.route('/dashboard')
@auth.login_required
def index():
return render_template('index.html')
# Upload File route
@app.route('/upload', methods=['GET', 'POST'])
@auth.login_required
def upload_file():
if request.method == 'POST':
FileHandler.CHeckExistingOrCreateNewUploadFolder()
docHandler = DocumentHandler()
docHandler.Upload(request=request)
return redirect(url_for('view_documents'))
return render_template('upload.html')
# View all documents with filters
@app.route('/documents')
@auth.login_required
def view_documents():
docHandler = DocumentHandler()
docHandler.View(request=request)
return render_template('view_docs.html', documents=docHandler.documents, years=docHandler.years)
# Upload file documents
@app.route('/uploads/<filename>')
@auth.login_required
def uploaded_file(filename):
mode = request.args.get('mode', 'view')
filepath = os.path.join(FileHandler.UPLOAD_FOLDER, secure_filename(filename))
if not os.path.exists(filepath):
flash("Unsupported file type for viewing", "warning")
return redirect(url_for('view_documents'))
file_ext = filename.rsplit('.', 1)[-1].lower()
# --- View Mode ---
if mode == 'view':
# pdf
if file_ext == 'pdf':
return send_file(filepath, mimetype='application/pdf')
# Word
elif file_ext in ['doc', 'docx']:
return send_file(filepath, as_attachment=True)
# Excel
elif file_ext in ['xls', 'xlsx']:
return send_file(filepath, as_attachment=False, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
else:
flash("Unsupported file type for viewing", "warning")
return redirect(url_for('view_documents'))
return send_file(filepath, as_attachment=True, download_name=filename)
## ===============================================
## ITR (Income Tax Return) Routes
## ===============================================
## 1. READ/DISPLAY all ITR records
@app.route('/itr_records')
@auth.login_required
def display_itr():
itr = ITRHandler()
records = itr.get_all_itr()
itr.close()
return render_template('display_itr.html', records=records)
## 2. CREATE/ADD a new ITR record
@app.route('/itr/add', methods=['GET', 'POST'])
@auth.login_required
def add_itr():
if request.method == 'POST':
itr = ITRHandler()
mat = MatCreditHandler()
itr.add_itr(request.form)
itr.close()
if 'documents' in request.files:
doc = DocumentHandler()
doc.Upload(request)
# AUTO SAVE MAT FROM ITR
mat.save_from_itr(
year=request.form["year"],
mat_created=float(request.form.get("mat_credit_created", 0)),
mat_utilized=float(request.form.get("mat_credit_utilized", 0)),
remarks="Created via ITR"
)
# flash("ITR record added successfully!", "success")
flash("ITR record and documents uploaded successfully!", "success")
return redirect(url_for('display_itr'))
return render_template('add_itr.html',current_date=date.today().isoformat())
## 4. DELETE an ITR record
@app.route('/itr/delete/<int:id>', methods=['POST'])
@auth.login_required
def delete_itr(id):
itr = ITRHandler()
itr.delete_itr_by_id(id=id)
itr.close()
return redirect(url_for('display_itr'))
## 3. UPDATE an existing ITR record
@app.route('/itr/update/<int:id>', methods=['GET', 'POST'])
@auth.login_required
def update_itr(id):
itr = ITRHandler()
if request.method == 'POST':
data = {k: request.form.get(k, 0) for k in request.form}
itr.update(id, data)
itr.close()
return redirect(url_for('display_itr'))
record = itr.get_itr_by_id(id)
itr.close()
return render_template('update_itr.html', record=record, current_date=date.today().isoformat())
## ===============================================
## AO (Assessing Officer) Routes
## ===============================================
# 1. DISPLAY all AO records
@app.route('/ao_records')
@auth.login_required
def display_ao():
ao = AOHandler()
ao_records = ao.get_all_ao()
ao.close()
return render_template('display_ao.html', ao_records=ao_records)
# 2. ADD a new AO record
@app.route('/ao/add', methods=['GET', 'POST'])
@auth.login_required
def add_ao():
if request.method == 'POST':
ao = AOHandler()
mat = MatCreditHandler()
ao.add_ao(request.form)
ao.close()
if 'documents' in request.files:
doc = DocumentHandler()
doc.Upload(request)
# AUTO SAVE MAT FROM ITR
mat.save_from_itr(
year=request.form["year"],
mat_created=float(request.form.get("mat_credit_created", 0)),
mat_utilized=float(request.form.get("mat_credit_utilized", 0)),
remarks="Created via ao"
)
flash("AO record added successfully!", "success")
return redirect(url_for('display_ao'))
return render_template('add_ao.html',current_date=date.today().isoformat())
# 3. UPDATE AO record
@app.route('/ao/update/<int:id>', methods=['GET', 'POST'])
@auth.login_required
def update_ao(id):
ao = AOHandler()
record = ao.get_ao_by_id(id)
if not record:
return "AO record not found", 404
if request.method == 'POST':
data = request.form.to_dict()
ao.update_ao(id, data)
ao.close()
flash("AO record updated successfully!", "success")
return redirect(url_for('display_ao'))
ao.close()
return render_template("update_ao.html", record=record)
# 4. DELETE AO record safely
@app.route('/ao/delete/<int:id>', methods=['POST'])
@auth.login_required
def delete_ao(id):
ao = AOHandler()
ao.delete_ao_by_id(id=id)
ao.close()
flash("AO deleted successfully!", "success")
return redirect(url_for('display_ao'))
## =======================================================
## CIT (Commissioner of Income Tax) Routes
## =======================================================
# 1 DISPLAY all CIT records
@app.route('/cit_records')
@auth.login_required
def display_cit():
cit = CITHandler()
cit_records = cit.get_all_cit()
cit.close()
return render_template('display_cit.html', cit_records=cit_records)
# 2 new CIT records add
@app.route('/cit/add', methods=['GET', 'POST'])
@auth.login_required
def add_cit():
if request.method == 'POST':
cit = CITHandler()
mat = MatCreditHandler()
cit.add_cit(request.form)
cit.close()
if 'documents' in request.files:
doc = DocumentHandler()
doc.Upload(request)
# AUTO SAVE MAT FROM ITR
mat.save_from_itr(
year=request.form["year"],
mat_created=float(request.form.get("mat_credit_created", 0)),
mat_utilized=float(request.form.get("mat_credit_utilized", 0)),
remarks="Created via cit"
)
flash("CIT record added successfully!", "success")
return redirect(url_for('display_cit'))
return render_template('add_cit.html', current_date=date.today().isoformat())
# 3 delete CIT records by id
@app.route('/cit/delete/<int:id>', methods=['POST'])
@auth.login_required
def delete_cit(id):
cit = CITHandler()
cit.delete_cit(id)
cit.close()
flash("CIT record deleted successfully!", "success")
return redirect(url_for('display_cit'))
# 4 update CIT records by id
@app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
@auth.login_required
def update_cit(id):
cit = CITHandler()
record = cit.get_cit_by_id(id)
if not record:
cit.close()
return "CIT record not found", 404
if request.method == 'POST':
data = {k: request.form.get(k, 0) for k in request.form}
cit.update_cit(id, data)
cit.close()
return redirect(url_for('display_cit'))
cit.close()
return render_template('update_cit.html', record=record)
## =======================================================
## ITAT (Income Tax Appellate Tribunal) Routes
## =======================================================
# 1.DISPLAY all ITAT records
@app.route('/itat_records')
@auth.login_required
def display_itat():
itat = ITATHandler()
records = itat.get_all_itat()
itat.close()
return render_template('display_itat.html', records=records)
# 2.Add new ITAT records
@app.route('/itat/add', methods=['GET', 'POST'])
@auth.login_required
def add_itat():
if request.method == 'POST':
itat = ITATHandler()
data = {k: request.form.get(k, 0) for k in request.form}
itat.add_itat(data)
itat.close()
flash("ITAT record added successfully!", "success")
return redirect(url_for('display_itat'))
return render_template('add_itat.html')
# 3.Update ITAT records by id
@app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
@auth.login_required
def update_itat(id):
itat = ITATHandler()
record = itat.get_itat_by_id(id)
if not record:
flash("Record Not Found!", "danger")
return redirect(url_for('display_itat'))
if request.method == 'POST':
itat.update_itat(id, request.form)
itat.close()
flash("ITAT Record Updated!", "success")
return redirect(url_for('display_itat'))
itat.close()
return render_template('update_itat.html', record=record)
# 3.delete ITAT records by id
@app.route('/itat/delete/<int:id>', methods=['POST'])
@auth.login_required
def delete_itat(id):
itat = ITATHandler()
itat.delete_itat_by_id(id)
itat.close()
flash("ITAT Record Deleted!", "success")
return redirect(url_for('display_itat'))
## =======================================================
## All Report Routes
## =======================================================
# report page
@app.route('/reports')
@auth.login_required
def reports():
return render_template("reports.html")
# Itr report download by year
@app.route('/itr_report', methods=['GET'])
@auth.login_required
def itr_report():
yearGetter = YearGet()
selected_year = request.args.get('year')
if selected_year:
itr = ITRHandler()
output = itr.itr_report_download(selected_year)
itr.close()
if output is None:
return "No records found for the selected year."
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITR_Report_{selected_year}.xlsx"
)
else:
years = yearGetter.get_year_by_model("GetITRYears")
yearGetter.close()
return render_template("itr_reports.html", years=years)
# Ao report download by year
@app.route('/ao_report', methods=['GET'])
@auth.login_required
def ao_report():
yearGetter = YearGet()
selected_year = request.args.get('year')
if selected_year:
ao = AOHandler()
output = ao.ao_report_download(selected_year)
ao.close()
if output is None:
return "No records found for the selected year."
return send_file(
output,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name=f"AO_Report_{selected_year}.xlsx"
)
else:
years = yearGetter.get_year_by_model("GetAOYears")
yearGetter.close()
return render_template("ao_reports.html", years=years)
# Cit report download by year
@app.route('/cit_report', methods=['GET'])
@auth.login_required
def cit_report():
selected_year = request.args.get('year')
yearGetter = YearGet()
if selected_year:
cit = CITHandler()
output = cit.cit_report_download(selected_year)
cit.close()
if output is None:
return "No records found for the selected year."
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"CIT_Report_{selected_year}_Vertical.xlsx"
)
else:
years = yearGetter.get_year_by_model("GetCITYears")
yearGetter.close()
return render_template("cit_reports.html", years=years)
# Itat report download by year
@app.route('/itat_report', methods=['GET'])
@auth.login_required
def itat_report():
selected_year = request.args.get('year')
yearGetter = YearGet()
if selected_year:
itat = ITATHandler()
output = itat.itat_report_download(selected_year)
itat.close()
if output is None:
return "No records found for the selected year."
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx"
)
else:
# Use stored procedure for years
years = yearGetter.get_year_by_model("GetITATYears")
yearGetter.close()
return render_template("itat_reports.html", years=years)
# summary report
@app.route('/summary_report', methods=['GET'])
@auth.login_required
def summary_report():
docHandler = DocumentHandler()
return docHandler.Summary_report(request=request)
@app.route('/summary/download', methods=['GET'])
@auth.login_required
def download_summary():
year_raw = request.args.get('year')
if not year_raw:
return "Year parameter is required", 400
docHandler = DocumentHandler()
# reuse your existing Summary_report method
return docHandler.Summary_report(request=request)
# check year in table existe or not by using ajax calling.
# @app.route('/check_year', methods=['POST'])
# @auth.login_required
# def check_year():
# data = request.get_json()
# table_name = data.get("table")
# year = data.get("year")
# check_year_obj = YearGet()
# result = check_year_obj.CheckYearExists(table_name, year)
# check_year_obj.close()
# return result
@app.route('/check_year', methods=['POST'])
def check_year():
table_name = request.json.get("table")
year = request.json.get("year")
conn = DBConfig.get_db_connection()
cursor = conn.cursor()
sqlstr = f"SELECT COUNT(*) FROM {table_name} WHERE year = %s"
cursor.execute(sqlstr, (year,))
result = cursor.fetchone()[0]
cursor.close()
conn.close()
return {"exists": result > 0}
# Mat credit from
@app.route("/mat_credit", methods=["GET"])
@auth.login_required
def mat_credit():
mat = MatCreditHandler()
try:
mat_rows, utilization_rows = mat.fetch_all()
finally:
mat.close()
utilization_map = {}
all_years = set()
for u in utilization_rows:
all_years.add(u["utilized_year"])
utilization_map.setdefault(
u["mat_credit_id"], {}
)[u["utilized_year"]] = u["utilized_amount"]
return render_template(
"mat_credit.html",
mat_rows=mat_rows,
utilization_map=utilization_map,
added_years=sorted(all_years)
)
# save mat credit row data
@app.route("/save_mat_row", methods=["POST"])
@auth.login_required
def save_mat_row():
mat = MatCreditHandler()
try:
mat.save_single(request.json)
return jsonify({"message": "Row saved successfully"})
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
mat.close()
@app.route("/summary/preview")
def summary_preview_route():
handler = DocumentHandler()
return handler.Summary_preview(request)
# save mat credit bulk data
# @app.route("/save_mat_all", methods=["POST"])
# @auth.login_required
# def save_mat_all():
# mat= MatCreditHandler()
# try:
# skipped = mat.save_bulk(request.json)
# return jsonify({"message": "Saved successfully", "skipped": skipped})
# except Exception as e:
# return jsonify({"error": str(e)}), 500
# run server
if __name__ == '__main__':
app.run(
host=os.getenv("FLASK_HOST"),
port=int(os.getenv("FLASK_PORT")),
debug=os.getenv("FLASK_DEBUG") == "true"
)