cit model commit.

This commit is contained in:
2025-12-01 12:04:07 +05:30
parent 0d6c873515
commit aa063b7a80
8 changed files with 456 additions and 95 deletions

View File

@@ -24,7 +24,7 @@ class AOHandler:
def get_ao_by_id(self, id): def get_ao_by_id(self, id):
# Call stored procedure # Call stored procedure
self.cursor.callproc('GetAORById', [id]) self.cursor.callproc('GetAOById', [id])
# Fetch result # Fetch result
records = [] records = []

74
AppCode/CITHandler.py Normal file
View File

@@ -0,0 +1,74 @@
from AppCode.Config import DBConfig
import mysql.connector
class CITHandler:
def __init__(self):
db = DBConfig()
self.conn = db.get_db_connection()
self.cursor = self.conn.cursor(dictionary=True)
# GET ALL CIT RECORDS
def get_all_cit(self):
self.cursor.callproc("GetAllCIT")
records = []
for result in self.cursor.stored_results():
records = result.fetchall()
return records
# GET CIT BY ID
def get_cit_by_id(self, id):
self.cursor.callproc("GetCITById", [id])
records = []
for result in self.cursor.stored_results():
records = result.fetchall()
if records:
return records[0]
return None
# INSERT CIT RECORD
def add_cit(self, data):
columns = [
"year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
"deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
"tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
"interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
]
values = [data.get(col, 0) for col in columns]
self.cursor.callproc("InsertCIT", values)
self.conn.commit()
# UPDATE CIT RECORD
# def update_cit(self, id, data):
# columns = [
# "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
# "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
# "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
# "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
# ]
# set_clause = ", ".join([f"{col}=%s" for col in columns])
# query = f"UPDATE cit SET {set_clause} WHERE id=%s"
# values = [data.get(col, 0) for col in columns]
# values.append(id)
# self.cursor.execute(query, tuple(values))
# self.conn.commit()
# DELETE CIT RECORD
def delete_cit(self, id):
self.cursor.callproc("DeleteCITById", [id])
self.conn.commit()
# CLOSE CONNECTION
def close(self):
self.cursor.close()
self.conn.close()

67
AppCode/ITATHandler.py Normal file
View File

@@ -0,0 +1,67 @@
# AppCode/ITATHandler.py
from AppCode.Config import DBConfig
class ITATHandler:
def __init__(self):
self.conn = DBConfig.get_db_connection()
self.cursor = self.conn.cursor(dictionary=True)
# GET ALL ITAT RECORDS (PROC)
def get_all_itat(self):
self.cursor.callproc("GetAllITAT")
records = []
for result in self.cursor.stored_results():
records = result.fetchall()
return records
# GET ITAT BY ID (PROC)
def get_itat_by_id(self, id):
self.cursor.callproc("GetITATById", [id])
records = []
for result in self.cursor.stored_results():
records = result.fetchall()
if records:
return records[0]
return None
# INSERT ITAT (PROC)
def add_itat(self, data):
values = [
data.get("cit_id"),
data.get("year"),
data.get("mat_tax_credit"),
data.get("surcharge"),
data.get("cess"),
data.get("total_credit")
]
self.cursor.callproc("InsertITAT", values)
self.conn.commit()
# UPDATE ITAT (PROC)
def update_itat(self, id, data):
values = [
id,
data.get("year"),
data.get("mat_tax_credit"),
data.get("surcharge"),
data.get("cess"),
data.get("total_credit")
]
self.cursor.callproc("UpdateITAT", values)
self.conn.commit()
# DELETE ITAT BY ID (PROC)
def delete_itat_by_id(self, id):
self.cursor.callproc("DeleteITATById", [id])
self.conn.commit()
# CLOSE CONNECTION
def close(self):
self.cursor.close()
self.conn.close()

Binary file not shown.

Binary file not shown.

305
main.py
View File

@@ -4,13 +4,16 @@ import mysql.connector
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from AppCode.FileHandler import FileHandler from AppCode.FileHandler import FileHandler
from AppCode.DocumentHandler import DocumentHandler from AppCode.DocumentHandler import DocumentHandler
from AppCode.ITRHandler import ITRHandler from AppCode.ITRHandler import ITRHandler
from AppCode.AOHandler import AOHandler from AppCode.AOHandler import AOHandler
from AppCode.CITHandler import CITHandler
from AppCode.ITATHandler import ITATHandler
from config import db_config from config import db_config
from AppCode.Config import DBConfig from AppCode.Config import DBConfig
app = Flask(__name__) app = Flask(__name__)
app.secret_key="secret1234" app.secret_key="secret1234"
app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER
@@ -268,41 +271,51 @@ def update_ao(id):
# DISPLAY all CIT records # DISPLAY all CIT records
@app.route('/cit_records') @app.route('/cit_records')
def display_cit(): def display_cit():
conn = get_db_connection() cit = CITHandler()
cursor = conn.cursor(dictionary=True) cit_records = cit.get_all_cit()
cursor.execute("SELECT * FROM cit ORDER BY year DESC, id DESC") cit.close()
cit_records = cursor.fetchall()
cursor.close()
conn.close()
return render_template('display_cit.html', cit_records=cit_records) return render_template('display_cit.html', cit_records=cit_records)
# ADD a new CIT record
@app.route('/cit/add', methods=['GET', 'POST']) @app.route('/cit/add', methods=['GET', 'POST'])
def add_cit(): def add_cit():
if request.method == 'POST': if request.method == 'POST':
conn = get_db_connection() cit = CITHandler()
cursor = conn.cursor() cit.add_cit(request.form)
cit.close()
columns = [ flash("CIT record added successfully!", "success")
"year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
"deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
"tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
"interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
]
values = [request.form.get(col, 0) for col in columns]
query = f"INSERT INTO cit ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
cursor.execute(query, tuple(values))
conn.commit()
flash("ITAT record added successfully!", "success")
cursor.close()
conn.close()
return redirect(url_for('display_cit')) return redirect(url_for('display_cit'))
return render_template('add_cit.html') return render_template('add_cit.html')
@app.route('/cit/delete/<int:id>', methods=['POST'])
def delete_cit(id):
cit = CITHandler()
cit.delete_cit(id)
cit.close()
flash("CIT record deleted successfully!", "success")
return redirect(url_for('display_cit'))
# @app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
# def update_cit(id):
# handler = CITHandler()
# record = handler.get_cit_by_id(id)
# if not record:
# handler.close()
# return "CIT record not found", 404
# if request.method == 'POST':
# handler.update_cit(id, request.form)
# handler.close()
# return redirect(url_for('display_cit'))
# handler.close()
# return render_template('add_cit.html', record=record)
@app.route('/cit/update/<int:id>', methods=['GET', 'POST']) @app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
def update_cit(id): def update_cit(id):
conn = get_db_connection() conn = get_db_connection()
@@ -334,30 +347,61 @@ def update_cit(id):
conn.close() conn.close()
return render_template('add_cit.html', record=record) return render_template('add_cit.html', record=record)
# DISPLAY all CIT records
# @app.route('/cit_records')
# def display_cit():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM cit ORDER BY year DESC, id DESC")
# cit_records = cursor.fetchall()
# cursor.close()
# conn.close()
# return render_template('display_cit.html', cit_records=cit_records)
@app.route('/cit/delete/<int:id>', methods=['POST'])
def delete_cit(id): # ADD a new CIT record
try: # @app.route('/cit/add', methods=['GET', 'POST'])
conn = get_db_connection() # def add_cit():
cursor = conn.cursor() # if request.method == 'POST':
cursor.execute("DELETE FROM cit WHERE id=%s", (id,)) # conn = get_db_connection()
conn.commit() # cursor = conn.cursor()
flash("ITR record deleted successfully!", "success")
except Exception as err: # columns = [
print(f"Error deleting CIT record: {err}") # "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
finally: # "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
cursor.close() # "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
conn.close() # "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
return redirect(url_for('display_cit')) # ]
# values = [request.form.get(col, 0) for col in columns]
# query = f"INSERT INTO cit ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
# cursor.execute(query, tuple(values))
# conn.commit()
# flash("ITAT record added successfully!", "success")
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
# return render_template('add_cit.html')
# (You will also need to add update_cit and delete_cit functions later) # @app.route('/cit/delete/<int:id>', methods=['POST'])
# def delete_cit(id):
# try:
# conn = get_db_connection()
# cursor = conn.cursor()
# cursor.execute("DELETE FROM cit WHERE id=%s", (id,))
# conn.commit()
# flash("ITR record deleted successfully!", "success")
# except Exception as err:
# print(f"Error deleting CIT record: {err}")
# finally:
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
#
# ADD THESE FINAL FUNCTIONS FOR ITAT TO YOUR APP.PY FILE
#
## ======================================================= ## =======================================================
## ITAT (Income Tax Appellate Tribunal) Routes ## ITAT (Income Tax Appellate Tribunal) Routes
@@ -366,16 +410,89 @@ def delete_cit(id):
# DISPLAY all ITAT records # DISPLAY all ITAT records
@app.route('/itat_records') @app.route('/itat_records')
def display_itat(): def display_itat():
conn = get_db_connection() itat = ITATHandler()
cursor = conn.cursor(dictionary=True) records = itat.get_all_itat()
# Querying the 'itat' table itat.close()
cursor.execute("SELECT * FROM itat ORDER BY year DESC, id DESC")
records = cursor.fetchall()
cursor.close()
conn.close()
# Rendering the 'display_itat.html' template
return render_template('display_itat.html', records=records) return render_template('display_itat.html', records=records)
# ADD a new ITAT record
# @app.route('/itat/add', methods=['GET', 'POST'])
# def add_itat():
# cit = CITHandler()
# cit_records = cit.get_all_cit()
# cit.close()
# if request.method == 'POST':
# data = {
# "cit_id": request.form.get("cit_id"),
# "year": request.form.get("year"),
# "mat_tax_credit": request.form.get("mat_tax_credit"),
# "surcharge": request.form.get("surcharge"),
# "cess": request.form.get("cess"),
# "total_credit": request.form.get("total_credit")
# }
# itat = ITATHandler()
# itat.add_itat(data)
# itat.close()
# flash("ITAT Record Added Successfully!", "success")
# return redirect(url_for('display_itat'))
# return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/delete/<int:id>', methods=['POST'])
def delete_itat(id):
itat = ITATHandler()
itat.delete_itat_by_id(id)
itat.close()
flash("ITAT Record Deleted!", "success")
return redirect(url_for('display_itat'))
@app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
def update_itat(id):
itat = ITATHandler()
record = itat.get_itat_by_id(id)
if not record:
flash("Record Not Found!", "danger")
return redirect(url_for('display_itat'))
if request.method == 'POST':
data = {
"year": request.form.get("year"),
"mat_tax_credit": request.form.get("mat_tax_credit"),
"surcharge": request.form.get("surcharge"),
"cess": request.form.get("cess"),
"total_credit": request.form.get("total_credit")
}
itat.update_itat(id, data)
itat.close()
flash("ITAT Record Updated!", "success")
return redirect(url_for('display_itat'))
itat.close()
return render_template('update_itat.html', record=record)
# DISPLAY all ITAT records
# @app.route('/itat_records')
# def display_itat():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Querying the 'itat' table
# cursor.execute("SELECT * FROM itat ORDER BY year DESC, id DESC")
# records = cursor.fetchall()
# cursor.close()
# conn.close()
# # Rendering the 'display_itat.html' template
# return render_template('display_itat.html', records=records)
# ADD a new ITAT record # ADD a new ITAT record
@app.route('/itat/add', methods=['GET', 'POST']) @app.route('/itat/add', methods=['GET', 'POST'])
@@ -409,53 +526,53 @@ def add_itat():
return render_template('add_itat.html', cit_records=cit_records) return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/update/<int:id>', methods=['GET', 'POST']) # @app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
def update_itat(id): # def update_itat(id):
conn = get_db_connection() # conn = get_db_connection()
cursor = conn.cursor(dictionary=True) # cursor = conn.cursor(dictionary=True)
# Fetch the existing record # # Fetch the existing record
cursor.execute("SELECT * FROM itat WHERE id=%s", (id,)) # cursor.execute("SELECT * FROM itat WHERE id=%s", (id,))
record = cursor.fetchone() # record = cursor.fetchone()
if not record: # if not record:
cursor.close() # cursor.close()
conn.close() # conn.close()
flash("ITAT record not found!", "danger") # flash("ITAT record not found!", "danger")
return redirect(url_for('display_itat')) # return redirect(url_for('display_itat'))
if request.method == 'POST': # if request.method == 'POST':
columns = ['year', 'mat_tax_credit', 'surcharge', 'cess', 'total_credit'] # columns = ['year', 'mat_tax_credit', 'surcharge', 'cess', 'total_credit']
values = [request.form.get(col, 0) for col in columns] # values = [request.form.get(col, 0) for col in columns]
set_clause = ", ".join([f"{col}=%s" for col in columns]) # set_clause = ", ".join([f"{col}=%s" for col in columns])
query = f"UPDATE itat SET {set_clause} WHERE id=%s" # query = f"UPDATE itat SET {set_clause} WHERE id=%s"
cursor.execute(query, tuple(values) + (id,)) # cursor.execute(query, tuple(values) + (id,))
conn.commit() # conn.commit()
cursor.close() # cursor.close()
conn.close() # conn.close()
flash("ITAT record updated successfully!", "success") # flash("ITAT record updated successfully!", "success")
return redirect(url_for('display_itat')) # return redirect(url_for('display_itat'))
cursor.close() # cursor.close()
conn.close() # conn.close()
# Render a template with existing values filled in # # Render a template with existing values filled in
return render_template('update_itat.html', record=record) # return render_template('update_itat.html', record=record)
@app.route('/itat/delete/<int:id>', methods=['POST']) # @app.route('/itat/delete/<int:id>', methods=['POST'])
def delete_itat(id): # def delete_itat(id):
try: # try:
conn = get_db_connection() # conn = get_db_connection()
cursor = conn.cursor() # cursor = conn.cursor()
cursor.execute("DELETE FROM itat WHERE id=%s", (id,)) # cursor.execute("DELETE FROM itat WHERE id=%s", (id,))
conn.commit() # conn.commit()
flash("ITAT record deleted successfully!", "success") # flash("ITAT record deleted successfully!", "success")
except Exception as err: # except Exception as err:
flash(f"Error deleting ITAT: {err}", "danger") # flash(f"Error deleting ITAT: {err}", "danger")
finally: # finally:
cursor.close() # cursor.close()
conn.close() # conn.close()
return redirect(url_for('display_itat')) # return redirect(url_for('display_itat'))

103
test.py Normal file
View File

@@ -0,0 +1,103 @@
import os
from AppCode.Config import DBConfig
from AppCode.FileHandler import FileHandler
from werkzeug.utils import secure_filename
class DocumentHandler:
years = ""
documents = ""
isSuccess = False
resultMessage = ""
def View(self,request):
year = request.args.get('year')
stage = request.args.get('stage')
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
if not connection:
self.isSuccess = False
return
cursor = connection.cursor()
params = []
query = "SELECT * FROM documents WHERE 1=1"
if year:
query += " AND year = %s"
params.append(year)
if stage:
query += " AND stage = %s"
params.append(stage)
cursor.execute(query, params)
documentsdata = cursor.fetchall()
print("*************")
print(documentsdata)
cursor.callproc("GetYear")
# records = []
# for result in cursor.stored_results():
# records = result.fetchall()
yearsdata = ""
for res in cursor.stored_results():
yearsdata = res.fetchall()
print(yearsdata)
self.years = yearsdata
self.documents = documentsdata
self.isSuccess = True
print("document --",documentsdata)
cursor.close()
connection.close()
def Upload(self, request):
"""Log user actions with timestamp, user, action, and details."""
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
if connection:
cursor = connection.cursor()
files = request.files.getlist('documents')
year = request.form['year']
stage = request.form['stage']
for file in files:
if file is not FileHandler.ALLOWED_EXTENSIONS:
continue
filename = secure_filename(file.filename)
filepath = os.path.join(FileHandler.UPLOAD_FOLDER, filename)
extension = file.filename.rsplit('.', 1)[1]
# Need to Check whetehr all three items are required
file.save(filepath)
# cursor.execute("""
# INSERT INTO documents (filename, filepath, filetype, year, stage)
# VALUES (%s, %s, %s, %s, %s)
# """, (filename, filepath, file.filename.rsplit('.', 1)[1], year, stage))
cursor.callproc('InsertDocument', [
filename,
filepath,
extension,
year,
stage
])
connection.commit()
cursor.close()
connection.close()
# return redirect(url_for('view_documents'))
#return render_template('upload.html')