ITR model update code
This commit is contained in:
95
AppCode/AOHandler.py
Normal file
95
AppCode/AOHandler.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from AppCode.Config import DBConfig
|
||||
import mysql.connector
|
||||
|
||||
|
||||
|
||||
class AOHandler:
|
||||
|
||||
def __init__(self):
|
||||
self.conn = DBConfig.get_db_connection()
|
||||
self.cursor = self.conn.cursor(dictionary=True)
|
||||
|
||||
|
||||
# GET ALL AO RECORDS using stored procedure "GetAllItr"
|
||||
def get_all_ao(self):
|
||||
|
||||
self.cursor.callproc("GetAllAO")
|
||||
records = []
|
||||
|
||||
for result in self.cursor.stored_results():
|
||||
records = result.fetchall()
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def get_ao_by_id(self, id):
|
||||
# Call stored procedure
|
||||
self.cursor.callproc('GetAORById', [id])
|
||||
|
||||
# Fetch result
|
||||
records = []
|
||||
for result in self.cursor.stored_results():
|
||||
records = result.fetchall()
|
||||
|
||||
if records:
|
||||
print(records[0])
|
||||
return records[0] # return single record
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
# INSERT ITR RECORD using procedure "add_itr"
|
||||
# def add_itr(self, data):
|
||||
|
||||
# columns = [
|
||||
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
|
||||
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
|
||||
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
|
||||
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
|
||||
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
|
||||
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
|
||||
# ]
|
||||
|
||||
# values = [data.get(col, 0) for col in columns]
|
||||
|
||||
# # Call your stored procedure
|
||||
# self.cursor.callproc("InsertITR", values)
|
||||
# self.conn.commit()
|
||||
|
||||
|
||||
# UPDATE ITR RECORD by ITR id
|
||||
# def update(self, id, data):
|
||||
|
||||
# columns = [
|
||||
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
|
||||
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
|
||||
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
|
||||
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
|
||||
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
|
||||
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
|
||||
# ]
|
||||
|
||||
# set_clause = ", ".join([f"{col}=%s" for col in columns])
|
||||
|
||||
# query = f"UPDATE itr SET {set_clause} WHERE id = %s"
|
||||
|
||||
# values = [data.get(col, 0) for col in columns]
|
||||
# values.append(id)
|
||||
|
||||
# self.cursor.execute(query, tuple(values))
|
||||
# self.conn.commit()
|
||||
|
||||
|
||||
# # DELETE RECORD by ITR id
|
||||
# def delete_itr_by_id(self, id):
|
||||
# # Call the stored procedure
|
||||
# self.cursor.callproc('DeleteITRById', [id])
|
||||
# self.conn.commit()
|
||||
|
||||
|
||||
# CLOSE CONNECTION
|
||||
def close(self):
|
||||
self.cursor.close()
|
||||
self.conn.close()
|
||||
21
AppCode/Config.py
Normal file
21
AppCode/Config.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import mysql.connector
|
||||
import os
|
||||
|
||||
class DBConfig:
|
||||
# Database credentials (can also be read from environment variables)
|
||||
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
|
||||
MYSQL_USER = os.getenv("MYSQL_USER", "root")
|
||||
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "root")
|
||||
MYSQL_DB = os.getenv("MYSQL_DB", "income_tax")
|
||||
|
||||
@staticmethod
|
||||
def get_db_connection():
|
||||
"""
|
||||
Returns a MySQL connection object.
|
||||
"""
|
||||
return mysql.connector.connect(
|
||||
host=DBConfig.MYSQL_HOST,
|
||||
user=DBConfig.MYSQL_USER,
|
||||
password=DBConfig.MYSQL_PASSWORD,
|
||||
database=DBConfig.MYSQL_DB
|
||||
)
|
||||
99
AppCode/DocumentHandler.py
Normal file
99
AppCode/DocumentHandler.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import os
|
||||
from AppCode.Config import DBConfig
|
||||
from AppCode.FileHandler import FileHandler
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
class DocumentHandler:
|
||||
|
||||
def __init__(self):
|
||||
self.years = []
|
||||
self.documents = []
|
||||
self.isSuccess = False
|
||||
self.resultMessage = ""
|
||||
|
||||
# VIEW DOCUMENTS
|
||||
def View(self, request):
|
||||
year = request.args.get('year', '')
|
||||
stage = request.args.get('stage', '')
|
||||
|
||||
dbconfig = DBConfig()
|
||||
connection = dbconfig.get_db_connection()
|
||||
|
||||
if not connection:
|
||||
self.isSuccess = False
|
||||
return
|
||||
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
# --- FILTER QUERY ---
|
||||
query = "SELECT * FROM documents WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if year != "":
|
||||
query += " AND year = %s"
|
||||
params.append(year)
|
||||
|
||||
if stage != "":
|
||||
query += " AND stage = %s"
|
||||
params.append(stage)
|
||||
|
||||
|
||||
cursor.execute(query, params)
|
||||
self.documents = cursor.fetchall()
|
||||
|
||||
# ---- GET YEARS FROM STORED PROCEDURE ----
|
||||
cursor.callproc("GetYear")
|
||||
|
||||
for result in cursor.stored_results():
|
||||
year_rows = result.fetchall()
|
||||
break # only first result set
|
||||
|
||||
self.years = [row['year'] for row in year_rows]
|
||||
|
||||
cursor.close()
|
||||
connection.close()
|
||||
self.isSuccess = True
|
||||
|
||||
# UPLOAD DOCUMENTS
|
||||
def Upload(self, request):
|
||||
"""Log user actions with timestamp, user, action, and details."""
|
||||
|
||||
dbconfig = DBConfig()
|
||||
connection = dbconfig.get_db_connection()
|
||||
|
||||
if connection:
|
||||
cursor = connection.cursor()
|
||||
files = request.files.getlist('documents')
|
||||
year = request.form['year']
|
||||
stage = request.form['stage']
|
||||
|
||||
for file in files:
|
||||
if file is not FileHandler.ALLOWED_EXTENSIONS:
|
||||
continue
|
||||
|
||||
|
||||
filename = secure_filename(file.filename)
|
||||
filepath = os.path.join(FileHandler.UPLOAD_FOLDER, filename)
|
||||
extension = file.filename.rsplit('.', 1)[1]
|
||||
# Need to Check whetehr all three items are required
|
||||
|
||||
file.save(filepath)
|
||||
|
||||
# cursor.execute("""
|
||||
# INSERT INTO documents (filename, filepath, filetype, year, stage)
|
||||
# VALUES (%s, %s, %s, %s, %s)
|
||||
# """, (filename, filepath, file.filename.rsplit('.', 1)[1], year, stage))
|
||||
|
||||
|
||||
cursor.callproc('InsertDocument', [
|
||||
filename,
|
||||
filepath,
|
||||
extension,
|
||||
year,
|
||||
stage
|
||||
])
|
||||
|
||||
connection.commit()
|
||||
cursor.close()
|
||||
connection.close()
|
||||
# return redirect(url_for('view_documents'))
|
||||
12
AppCode/FileHandler.py
Normal file
12
AppCode/FileHandler.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import os
|
||||
|
||||
class FileHandler:
|
||||
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'doc', 'xlsx', 'xls'}
|
||||
UPLOAD_FOLDER = os.path.join('static', 'uploads')
|
||||
|
||||
|
||||
@staticmethod
|
||||
def CHeckExistingOrCreateNewUploadFolder():
|
||||
#Wheteher path exists
|
||||
os.makedirs(FileHandler.UPLOAD_FOLDER, exist_ok=True)
|
||||
return
|
||||
96
AppCode/ITRHandler.py
Normal file
96
AppCode/ITRHandler.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from AppCode.Config import DBConfig
|
||||
import mysql.connector
|
||||
|
||||
|
||||
|
||||
class ITRHandler:
|
||||
|
||||
def __init__(self):
|
||||
self.conn = DBConfig.get_db_connection()
|
||||
self.cursor = self.conn.cursor(dictionary=True)
|
||||
|
||||
|
||||
# GET ALL ITR RECORDS using stored procedure "GetAllItr"
|
||||
def get_all_itr(self):
|
||||
# self.cursor = conn.cursor(dictionary=True)
|
||||
|
||||
self.cursor.callproc("GetAllItr")
|
||||
records = []
|
||||
|
||||
for result in self.cursor.stored_results():
|
||||
records = result.fetchall()
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def get_itr_by_id(self, id):
|
||||
# Call stored procedure
|
||||
self.cursor.callproc('GetITRById', [id])
|
||||
|
||||
# Fetch result
|
||||
records = []
|
||||
for result in self.cursor.stored_results():
|
||||
records = result.fetchall()
|
||||
|
||||
if records:
|
||||
print(records[0])
|
||||
return records[0] # return single record
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
# INSERT ITR RECORD using procedure "add_itr"
|
||||
def add_itr(self, data):
|
||||
|
||||
columns = [
|
||||
'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
|
||||
'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
|
||||
'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
|
||||
'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
|
||||
'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
|
||||
'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
|
||||
]
|
||||
|
||||
values = [data.get(col, 0) for col in columns]
|
||||
|
||||
# Call your stored procedure
|
||||
self.cursor.callproc("InsertITR", values)
|
||||
self.conn.commit()
|
||||
|
||||
|
||||
# UPDATE ITR RECORD by ITR id
|
||||
def update(self, id, data):
|
||||
|
||||
columns = [
|
||||
'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
|
||||
'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
|
||||
'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
|
||||
'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
|
||||
'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
|
||||
'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
|
||||
]
|
||||
|
||||
set_clause = ", ".join([f"{col}=%s" for col in columns])
|
||||
|
||||
query = f"UPDATE itr SET {set_clause} WHERE id = %s"
|
||||
|
||||
values = [data.get(col, 0) for col in columns]
|
||||
values.append(id)
|
||||
|
||||
self.cursor.execute(query, tuple(values))
|
||||
self.conn.commit()
|
||||
|
||||
|
||||
# DELETE RECORD by ITR id
|
||||
def delete_itr_by_id(self, id):
|
||||
# Call the stored procedure
|
||||
self.cursor.callproc('DeleteITRById', [id])
|
||||
self.conn.commit()
|
||||
|
||||
|
||||
# CLOSE CONNECTION
|
||||
def close(self):
|
||||
self.cursor.close()
|
||||
self.conn.close()
|
||||
25
AppCode/Log.py
Normal file
25
AppCode/Log.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
|
||||
from flask import current_app
|
||||
|
||||
from datetime import datetime
|
||||
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
||||
|
||||
import os
|
||||
|
||||
class LogHelper:
|
||||
@staticmethod
|
||||
def log_action(action, details=""):
|
||||
"""Log user actions with timestamp, user, action, and details."""
|
||||
logData = LogData()
|
||||
logData.WriteLog(action, details="")
|
||||
|
||||
|
||||
|
||||
class LogData:
|
||||
|
||||
filepath = ""
|
||||
timestamp = None
|
||||
|
||||
def __init__(self):
|
||||
self.filepath = os.path.join(current_app.root_path, 'activity.log')
|
||||
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
58
AppCode/Utiliies.py
Normal file
58
AppCode/Utiliies.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from flask import flash, jsonify, json
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class RegEx:
|
||||
patternAlphabetOnly = "^[A-Za-z ]+$"
|
||||
|
||||
|
||||
class ResponseHandler:
|
||||
@staticmethod
|
||||
def invalid_name(entity):
|
||||
return {'status': 'error', 'message': f'Invalid {entity} name. Only letters are allowed!'}
|
||||
|
||||
@staticmethod
|
||||
def already_exists(entity):
|
||||
return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'}
|
||||
|
||||
@staticmethod
|
||||
def add_success(entity):
|
||||
return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'}
|
||||
|
||||
@staticmethod
|
||||
def add_failure(entity):
|
||||
return {'status': 'error', 'message': f'Failed to add {entity}.'}
|
||||
|
||||
@staticmethod
|
||||
def is_available(entity):
|
||||
return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'}
|
||||
|
||||
@staticmethod
|
||||
def delete_success(entity):
|
||||
return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'}
|
||||
|
||||
@staticmethod
|
||||
def delete_failure(entity):
|
||||
return {'status': 'error', 'message': f'Failed to delete {entity}.'}
|
||||
|
||||
@staticmethod
|
||||
def update_success(entity):
|
||||
return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'}
|
||||
|
||||
@staticmethod
|
||||
def update_failure(entity):
|
||||
return {'status': 'error', 'message': f'Failed to update {entity}.'}
|
||||
|
||||
@staticmethod
|
||||
def fetch_failure(entity):
|
||||
return {'status': 'error', 'message': f'Failed to fetch {entity}.'}
|
||||
|
||||
|
||||
class HtmlHelper:
|
||||
# Helper: JSON Response Formatter
|
||||
|
||||
@staticmethod
|
||||
def json_response(message_obj, status_code):
|
||||
return jsonify(message_obj), status_code
|
||||
#May need to refactor further
|
||||
|
||||
BIN
AppCode/__pycache__/AOHandler.cpython-313.pyc
Normal file
BIN
AppCode/__pycache__/AOHandler.cpython-313.pyc
Normal file
Binary file not shown.
BIN
AppCode/__pycache__/Config.cpython-313.pyc
Normal file
BIN
AppCode/__pycache__/Config.cpython-313.pyc
Normal file
Binary file not shown.
BIN
AppCode/__pycache__/DocumentHandler.cpython-313.pyc
Normal file
BIN
AppCode/__pycache__/DocumentHandler.cpython-313.pyc
Normal file
Binary file not shown.
BIN
AppCode/__pycache__/FileHandler.cpython-313.pyc
Normal file
BIN
AppCode/__pycache__/FileHandler.cpython-313.pyc
Normal file
Binary file not shown.
BIN
AppCode/__pycache__/ITRHandler.cpython-313.pyc
Normal file
BIN
AppCode/__pycache__/ITRHandler.cpython-313.pyc
Normal file
Binary file not shown.
Reference in New Issue
Block a user