6 Commits

16 changed files with 891 additions and 896 deletions

3
.gitignore vendored
View File

@@ -3,5 +3,4 @@ venv/
__pycache__/
static/downloads/
static/uploads/
static/uploads/

View File

@@ -2,35 +2,27 @@ from flask import Blueprint, render_template, request, redirect, url_for, jsonif
from flask_login import login_required
import config
from model.State import State
from model.Block import Block
from model.Utilities import HtmlHelper
block_bp = Blueprint('block', __name__)
#
# --- Add Block page -------
@block_bp.route('/add_block', methods=['GET', 'POST'])
@login_required
def add_block():
block = Block()
if request.method == 'POST':
block.AddBlock(request)
return block.resultMessage
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
state = State()
states = state.GetAllStates(request=request)
block_data = block.GetAllBlocks(request)
cursor.close()
connection.close()
return render_template(
'add_block.html',
states=states,

View File

@@ -27,8 +27,6 @@ def upload():
f"User {current_user.id} Upload Excel File '{file.filename}'"
)
return redirect(url_for('excel.show_table', filename=file.filename))
else:
return redirect(url_for('upload_excel_file'))
return render_template('uploadExcelFile.html')

View File

@@ -1,70 +1,46 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required, current_user
from model.gst_release import GSTReleasemodel
from flask import Blueprint, render_template, request, redirect, url_for
from flask_login import login_required
from model.gst_release import GSTRelease
from model.Log import LogHelper
from flask import flash, current_app
gst_release_bp = Blueprint('gst_release_bp', __name__)
gst_service = GSTRelease()
# ------------------- Add GST Release -------------------
# ---------------- ADD GST RELEASE ----------------
@gst_release_bp.route('/add_gst_release', methods=['GET', 'POST'])
@login_required
def add_gst_release():
gst_releases_dict = GSTReleasemodel.fetch_all_gst_releases()
gst_releases = [
[
g['GST_Release_Id'], g['PMC_No'], g['invoice_no'],
g['Basic_Amount'], g['Final_Amount'], g['Total_Amount'], g['UTR']
] for g in gst_releases_dict
] if gst_releases_dict else []
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
contractor_id = request.form['subcontractor_id']
LogHelper.log_action("Add GST Release", f"User {current_user.id} added GST release '{pmc_no}'")
GSTReleasemodel.insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id)
gst_service.AddGSTRelease(request)
LogHelper.log_action("Add GST Release", f"User added GST release")
flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('add_gst_release.html', gst_releases=gst_releases, invoices=[])
gst_releases = gst_service.GetAllGSTReleases()
return render_template('add_gst_release.html', gst_releases=gst_releases)
# ------------------- Edit GST Release -------------------
# ---------------- EDIT GST RELEASE ----------------
@gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def edit_gst_release(gst_release_id):
gst_release_data = GSTReleasemodel.fetch_gst_release_by_id(gst_release_id)
if not gst_release_data:
gst_data = gst_service.GetGSTReleaseByID(gst_release_id)
if not gst_data:
return "GST Release not found", 404
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit GST Release", f"User {current_user.id} edited GST release '{pmc_no}'")
GSTReleasemodel.update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr)
gst_service.EditGSTRelease(request, gst_release_id)
LogHelper.log_action("Edit GST Release", f"User edited GST release")
flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=[])
return render_template('edit_gst_release.html', gst_release_data=gst_data)
# ------------------- Delete GST Release -------------------
# ---------------- DELETE GST RELEASE ----------------
@gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def delete_gst_release(gst_release_id):
success, utr = GSTReleasemodel.delete_gst_release(gst_release_id)
if not success:
return jsonify({"message": "GST Release not found or failed to delete", "status": "error"}), 404
LogHelper.log_action("Delete GST Release", f"User {current_user.id} deleted GST release '{gst_release_id}'")
return jsonify({"message": f"GST Release {gst_release_id} deleted successfully.", "status": "success"}), 200
gst_service.DeleteGSTRelease(gst_release_id) # remove request
LogHelper.log_action("Delete GST Release", f"User deleted GST release")
flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
return redirect(url_for('gst_release_bp.add_gst_release'))

View File

@@ -1,98 +1,101 @@
# controllers/invoice_controller.py
from flask import Blueprint, request, jsonify, render_template
from flask_login import login_required, current_user
from model.Invoice import *
from model.Log import LogHelper
from model.Log import LogHelper
invoice_bp = Blueprint('invoice', __name__)
# -------------------------------- Add Invoice ---------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST'])
@login_required
def add_invoice():
if request.method == 'POST':
# ------------------------------- Helpers -------------------------------
def handle_exception(func):
"""Decorator to handle exceptions and return JSON error responses."""
def wrapper(*args, **kwargs):
try:
village_name = request.form.get('village')
village_result = get_village_id(village_name)
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
data = request.form
invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id)
LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
return func(*args, **kwargs)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
wrapper.__name__ = func.__name__
return wrapper
def log_action(action: str, detail: str):
LogHelper.log_action(action, f"User {current_user.id} {detail}")
# ------------------------------- Add Invoice -------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST'])
@login_required
@handle_exception
def add_invoice():
if request.method == 'POST':
data = request.form
village_name = data.get('village')
village_result = get_village_id(village_name)
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id)
log_action("Add invoice", f"added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
invoices = get_all_invoice_details()
villages = get_all_villages()
return render_template('add_invoice.html', invoices=invoices, villages=villages)
# ------------------- Search Subcontractor -------------------
# ------------------------------- Search Subcontractor -------------------------------
@invoice_bp.route('/search_subcontractor', methods=['POST'])
@login_required
@handle_exception
def search_subcontractor():
sub_query = request.form.get("query")
results = search_contractors(sub_query)
query = request.form.get("query", "").strip()
results = search_contractors(query)
if not results:
return "<li>No subcontractor found</li>"
output = "".join(
f"<li data-id='{row['Contractor_Id']}'>{row['Contractor_Name']}</li>"
for row in results
)
return output
return "".join(f"<li data-id='{r['Contractor_Id']}'>{r['Contractor_Name']}</li>" for r in results)
# ------------------- Get Hold Types -------------------
# ------------------------------- Get Hold Types -------------------------------
@invoice_bp.route('/get_hold_types', methods=['GET'])
@login_required
@handle_exception
def get_hold_types():
hold_types = get_all_hold_types()
LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type '{hold_types}'")
log_action("Get hold type", f"retrieved hold types '{hold_types}'")
return jsonify(hold_types)
# ------------------- Edit Invoice -------------------
# ------------------------------- Edit Invoice -------------------------------
@invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST'])
@login_required
@handle_exception
def edit_invoice(invoice_id):
if request.method == 'POST':
data = request.form
update_invoice(data, invoice_id)
update_inpayment(data)
LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice '{invoice_id}'")
log_action("Edit invoice", f"edited invoice '{invoice_id}'")
return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200
invoice = get_invoice_by_id(invoice_id)
return render_template('edit_invoice.html', invoice=invoice)
# ------------------- Delete Invoice -------------------
# ------------------------------- Delete Invoice -------------------------------
@invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET'])
@login_required
@handle_exception
def delete_invoice_route(invoice_id):
try:
delete_invoice_data(invoice_id, current_user.id)
LogHelper.log_action("Delete Invoice", f"User {current_user.id} deleted Invoice '{invoice_id}'")
return jsonify({
"message": f"Invoice {invoice_id} deleted successfully.",
"status": "success"
})
except Exception as e:
return jsonify({
"message": str(e),
"status": "error"
}), 500
delete_invoice_data(invoice_id, current_user.id)
log_action("Delete invoice", f"deleted invoice '{invoice_id}'")
return jsonify({"status": "success", "message": f"Invoice {invoice_id} deleted successfully."})

View File

@@ -1,4 +1,4 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask import Blueprint, render_template, request, redirect, url_for, jsonify, flash
from flask_login import login_required, current_user
from model.payment import Paymentmodel
from model.Log import LogHelper
@@ -88,16 +88,14 @@ def edit_payment(payment_id):
# ------------------- Delete Payment -------------------
@payment_bp.route('/delete_payment/<int:payment_id>', methods=['GET', 'POST'])
@payment_bp.route('/delete_payment/<int:payment_id>', methods=['POST'])
@login_required
def delete_payment(payment_id):
success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id)
if not success:
return jsonify({"message": "Payment not found or failed to delete", "status": "error"}), 404
flash("Payment not found or failed to delete", "error")
else:
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'")
flash(f"Payment ID {payment_id} deleted successfully.", "success")
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'")
return jsonify({
"message": f"Payment ID {payment_id} deleted successfully.",
"status": "success"
}), 200
return redirect(url_for('payment_bp.add_payment'))

View File

@@ -1,34 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required
from model.Subcontractor import Subcontractor
from model.Utilities import HtmlHelper, ResponseHandler
subcontractor_bp = Blueprint('subcontractor', __name__)
# ----------------------------------------------------------
# Helpers (unchanged)
# ----------------------------------------------------------
class HtmlHelper:
@staticmethod
def json_response(data, status=200):
return jsonify(data), status
class ResponseHandler:
@staticmethod
def fetch_failure(entity):
return {"status": "error", "message": f"Failed to fetch {entity}"}
@staticmethod
def add_failure(entity):
return {"status": "error", "message": f"Failed to add {entity}"}
@staticmethod
def update_failure(entity):
return {"status": "error", "message": f"Failed to update {entity}"}
@staticmethod
def delete_failure(entity):
return {"status": "error", "message": f"Failed to delete {entity}"}
# ----------------------------------------------------------
# LIST + ADD

View File

@@ -1,8 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required
import config
from model.Village import Village
from model.State import State
@@ -23,7 +26,6 @@ def add_village():
state = State()
states = state.GetAllStates(request=request)
villages = village.GetAllVillages(request=request)
return render_template(
@@ -42,7 +44,6 @@ def get_districts(state_id):
cursor = connection.cursor()
cursor.callproc("GetDistrictByStateID", [state_id])
districts = []
for rs in cursor.stored_results():
@@ -51,15 +52,7 @@ def get_districts(state_id):
cursor.close()
connection.close()
district_list = []
for d in districts:
district_list.append({
"id": d[0],
"name": d[1]
})
return jsonify(district_list)
return jsonify([{"id": d[0], "name": d[1]} for d in districts])
# ------------------------- Fetch Blocks -------------------------
@@ -71,7 +64,6 @@ def get_blocks(district_id):
cursor = connection.cursor()
cursor.callproc("GetBlocksByDistrictID", [district_id])
blocks = []
for rs in cursor.stored_results():
@@ -80,22 +72,13 @@ def get_blocks(district_id):
cursor.close()
connection.close()
block_list = []
for b in blocks:
block_list.append({
"id": b[0],
"name": b[1]
})
return jsonify(block_list)
return jsonify([{"id": b[0], "name": b[1]} for b in blocks])
# ------------------------- Check Village -------------------------
@village_bp.route('/check_village', methods=['POST'])
@login_required
def check_village():
village = Village()
return village.CheckVillage(request=request)
@@ -106,14 +89,9 @@ def check_village():
def delete_village(village_id):
village = Village()
village.DeleteVillage(request=request, village_id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
else:
flash(village.resultMessage, "success")
flash(village.resultMessage, "success" if village.isSuccess else "error")
return redirect(url_for('village.add_village'))
@@ -135,8 +113,8 @@ def edit_village(village_id):
else:
flash(village.resultMessage, "error")
village_data = village.GetVillageByID(request=request, id=village_id)
blocks = village.GetAllBlocks(request=request)
village_data = village.GetVillageByID(id=village_id) or []
blocks = village.GetAllBlocks() or []
return render_template(
'edit_village.html',
@@ -145,23 +123,17 @@ def edit_village(village_id):
)
else:
village_data = village.GetVillageByID(request=request, id=village_id)
# ✅ FIXED HERE (removed request)
village_data = village.GetVillageByID(id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
return redirect(url_for('village.add_village'))
blocks = village.GetAllBlocks(request=request)
if village_data is None:
village_data = []
if blocks is None:
blocks = []
blocks = village.GetAllBlocks() or []
return render_template(
'edit_village.html',
village_data=village_data,
village_data=village_data or [],
blocks=blocks
)

View File

@@ -1,72 +1,65 @@
import mysql.connector
from mysql.connector import Error
import config
import openpyxl
import os
import re
import ast
from datetime import datetime
class ContractorInfo:
ID = ""
contInfo = None
def __init__(self, id):
self.ID = id
print(id)
def __init__(self, contractor_id):
self.ID = contractor_id
self.contInfo = None
self.fetchData()
def fetchData(self):
"""Fetch basic contractor info by ID."""
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
cursor.callproc('GetContractorInfoById', [self.ID])
for result in cursor.stored_results():
self.contInfo = result.fetchone()
print(self.contInfo,flush=True)
with connection.cursor(dictionary=True, buffered=True) as cursor:
cursor.callproc('GetContractorInfoById', [self.ID])
# Get the first result set
for result in cursor.stored_results():
self.contInfo = result.fetchone()
except Error as e:
print(f"Error fetching contractor info: {e}")
finally:
cursor.close()
connection.close()
if connection.is_connected():
connection.close()
def fetchalldata(self):
"""Fetch hold types and invoices for contractor."""
data = {}
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
with connection.cursor(dictionary=True, buffered=True) as cursor:
# Fetch Hold Types
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
data['hold_types'] = hold_type_map
# ---------------- Hold Types ----------------
cursor = connection.cursor(dictionary=True)
# Fetch Invoices
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
# Remove duplicate invoices
seen_ids = set()
unique_invoices = []
for inv in invoices:
if inv['Invoice_Id'] not in seen_ids:
seen_ids.add(inv['Invoice_Id'])
unique_invoices.append(inv)
data['invoices'] = unique_invoices
except Error as e:
print(f"Error fetching contractor data: {e}")
finally:
cursor.close()
connection.close()
if connection.is_connected():
connection.close()
return data

View File

@@ -1,55 +1,51 @@
import config
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GST:
@staticmethod
def get_unreleased_gst():
# Use ItemCRUD for Invoices
invoice_crud = ItemCRUD(itemType=ItemCRUDType.Invoice)
invoices_rows = invoice_crud.GetAllData(storedproc="GetAllInvoicesBasic")
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
if not invoice_crud.isSuccess:
return [] # Could also log invoice_crud.resultMessage
try:
# ----------- Invoices -----------
cursor.callproc('GetAllInvoicesBasic')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
invoices = [
dict(
Invoice_No=row[1],
GST_SD_Amount=float(row[2]) if row[2] is not None else 0
)
for row in invoices_rows
]
# Use ItemCRUD for GST Releases
gst_crud = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst_rows = gst_crud.GetAllData(storedproc="GetAllGSTReleasesBasic")
# ----------- GST Releases -----------
cursor.callproc('GetAllGSTReleasesBasic')
gst_releases = []
for result in cursor.stored_results():
gst_releases = result.fetchall()
if not gst_crud.isSuccess:
return [] # Could also log gst_crud.resultMessage
gst_invoice_nos = {
g['Invoice_No']
for g in gst_releases
if g['Invoice_No']
}
gst_invoice_nos = {
g[2] # Invoice_No is at index 2
for g in gst_rows
if g[2]
}
gst_basic_amounts = {
float(g['Basic_Amount'])
for g in gst_releases
if g['Basic_Amount'] is not None
}
gst_basic_amounts = {
float(g[3]) # Basic_Amount at index 3
for g in gst_rows
if g[3] is not None
}
unreleased = []
# Filter unreleased invoices
unreleased = []
for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = inv['GST_SD_Amount'] in gst_basic_amounts
for inv in invoices:
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = float(
inv.get('GST_SD_Amount') or 0
) in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
return unreleased
finally:
cursor.close()
connection.close()
return unreleased

View File

@@ -1,35 +1,76 @@
import config
import mysql.connector
# ------------------- Helper -------------------
# ------------------- Helper Functions -------------------
def clear_results(cursor):
"""Consume all stored results to prevent cursor issues."""
for r in cursor.stored_results():
r.fetchall()
def fetch_one(cursor):
"""Fetch first row from stored results."""
for result in cursor.stored_results():
return result.fetchone()
return None
# ------------------- Get Village Id -------------------
def get_village_id(village_name):
def fetch_all(cursor):
"""Fetch all rows from stored results."""
data = []
for result in cursor.stored_results():
data.extend(result.fetchall())
return data
def get_numeric_values(data):
"""Return numeric fields for invoices safely."""
return [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def execute_db_operation(operation_func):
"""General DB operation wrapper with commit/rollback."""
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetVillageIdByName", (village_name,))
village_result = None
for rs in cursor.stored_results():
village_result = rs.fetchone()
cursor.close()
connection.close()
return village_result
# ------------------- Insert Invoice -------------------
def insert_invoice(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# 1. Insert Invoice
result = operation_func(cursor)
connection.commit()
return result
except Exception:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
# ------------------- Village Functions -------------------
def get_village_id(village_name):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (village_name,))
return fetch_one(cursor)
return execute_db_operation(operation)
def get_all_villages():
def operation(cursor):
cursor.callproc("GetAllVillages")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Invoice Functions -------------------
def insert_invoice(data, village_id):
def operation(cursor):
# Insert invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
@@ -37,29 +78,14 @@ def insert_invoice(data, village_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0)
*get_numeric_values(data)
])
invoice_id = None
for result in cursor.stored_results():
row = result.fetchone()
if row:
invoice_id = row.get('invoice_id')
if not invoice_id:
invoice_row = fetch_one(cursor)
if not invoice_row:
raise Exception("Invoice ID not returned")
invoice_id = invoice_row.get('invoice_id')
# 2. Insert Inpayment
# Insert inpayment
cursor.callproc('InsertInpayment', [
data.get('pmc_no'),
village_id,
@@ -67,221 +93,41 @@ def insert_invoice(data, village_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
*get_numeric_values(data),
data.get('subcontractor_id')
])
clear_results(cursor)
connection.commit()
return invoice_id
except Exception as e:
connection.rollback()
raise e
return execute_db_operation(operation)
finally:
cursor.close()
connection.close()
# ------------------- Assign Subcontractor -------------------
def assign_subcontractor(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Insert Hold Types -------------------
def insert_hold_types(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = None
for result in cursor.stored_results():
hold_type_result = result.fetchone()
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
hold_amount = float(hold_amount or 0)
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
hold_amount
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Get All Invoices -------------------
def get_all_invoice_details():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
def operation(cursor):
cursor.callproc('GetAllInvoiceDetails')
return fetch_all(cursor)
return execute_db_operation(operation)
cursor.callproc('GetAllInvoiceDetails')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.close()
connection.close()
return invoices
# ------------------- Get All Villages -------------------
def get_all_villages():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllVillages")
villages = []
for result in cursor.stored_results():
villages = result.fetchall()
cursor.close()
connection.close()
return villages
# ------------------- Search Contractors -------------------
def search_contractors(sub_query):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('SearchContractorsByName', [sub_query])
results = []
for result in cursor.stored_results():
results = result.fetchall()
cursor.close()
connection.close()
return results
# ------------------- Get All Hold Types -------------------
def get_all_hold_types():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllHoldTypes")
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
cursor.close()
connection.close()
return hold_types
# ------------------- Get Invoice By Id -------------------
def get_invoice_by_id(invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
def operation(cursor):
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = fetch_one(cursor)
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = None
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = fetch_all(cursor)
for result in cursor.stored_results():
invoice = result.fetchone()
if invoice:
invoice["hold_amounts"] = hold_amounts
return invoice
return execute_db_operation(operation)
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = []
for result in cursor.stored_results():
hold_amounts = result.fetchall()
if invoice:
invoice["hold_amounts"] = hold_amounts
cursor.close()
connection.close()
return invoice
# ------------------- Update Invoice -------------------
def update_invoice(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
def operation(cursor):
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = None
for rs in cursor.stored_results():
village = rs.fetchone()
village = fetch_one(cursor)
if not village:
raise Exception("Village not found")
village_id = village['Village_Id']
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInvoice', [
data.get('pmc_no'),
village_id,
@@ -289,91 +135,101 @@ def update_invoice(data, invoice_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*numeric,
*get_numeric_values(data),
invoice_id
])
clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Update Inpayment -------------------
def update_inpayment(data):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def operation(cursor):
cursor.callproc('UpdateInpayment', [
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
*numeric,
*get_numeric_values(data),
data.get('pmc_no'),
data.get('invoice_no')
])
clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Delete Invoice -------------------
def delete_invoice_data(invoice_id, user_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetInvoicePMCById', [invoice_id])
def operation(cursor):
# Fetch PMC and Invoice_No from DB
cursor.callproc('GetInvoicePMCById', (invoice_id,))
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
raise Exception("Invoice not found")
# Use exact DB keys
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete invoice
cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor)
cursor.callproc(
'DeleteInpaymentByPMCInvoice',
[record['PMC_No'], record['invoice_no']]
)
# Delete inpayment
cursor.callproc('DeleteInpaymentByPMCInvoice', (pmc_no, invoice_no))
clear_results(cursor)
connection.commit()
execute_db_operation(operation)
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Subcontractor Functions -------------------
def assign_subcontractor(data, village_id):
def operation(cursor):
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
execute_db_operation(operation)
# ------------------- Hold Types Functions -------------------
def insert_hold_types(data, invoice_id):
def operation(cursor):
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = fetch_one(cursor)
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
float(hold_amount or 0)
])
clear_results(cursor)
execute_db_operation(operation)
def get_all_hold_types():
def operation(cursor):
cursor.callproc("GetAllHoldTypes")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Contractor Functions -------------------
def search_contractors(sub_query):
def operation(cursor):
cursor.callproc('SearchContractorsByName', [sub_query])
return fetch_all(cursor)
return execute_db_operation(operation)

View File

@@ -1,39 +1,29 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os
from datetime import datetime
from flask import current_app
from flask_login import current_user
class LogHelper:
@staticmethod
def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details."""
logData = LogData()
logData.WriteLog(action, details="")
"""Add a log entry."""
log_data = LogData()
log_data.add_log(action, details)
class LogData:
filepath = ""
timestamp = None
def __init__(self):
self.filepath = os.path.join(current_app.root_path, 'activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.user = getattr(current_user, "cn", None) \
or getattr(current_user, "username", None) \
or getattr(current_user, "sAMAccountName", "Unknown")
if hasattr(current_user, "cn") and current_user.cn:
self.user = current_user.cn
elif hasattr(current_user, "username") and current_user.username:
self.user = current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
self.user = current_user.sAMAccountName
else:
self.user = "Unknown"
def WriteLog(self, action, details=""):
"""Log user actions with timestamp, user, action, and details."""
def add_log(self, action, details=""):
"""Create/Add a log entry."""
with open(self.filepath, "a", encoding="utf-8") as f:
f.write(
f"Timestamp: {self.timestamp} | "
@@ -42,46 +32,73 @@ class LogData:
f"Details: {details}\n"
)
def GetActivitiesLog(self):
def get_all_logs(self):
"""Read all logs."""
logs = []
if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f:
with open(self.filepath, 'r', encoding="utf-8") as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
"timestamp": parts[0].split(":", 1)[1].strip(),
"user": parts[1].split(":", 1)[1].strip(),
"action": parts[2].split(":", 1)[1].strip(),
"details": parts[3].split(":", 1)[1].strip()
})
return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName):
def get_filtered_logs(self, start_date=None, end_date=None, user_name=None):
"""Filter logs by date and/or user."""
logs = self.get_all_logs()
filtered_logs = self.GetActivitiesLog()
# Filter by date
if start_date or end_date:
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max
logs = [
log for log in logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
# Date filter
if startDate or endDate:
try:
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
# Filter by username
if user_name:
logs = [log for log in logs if user_name.lower() in log.get("user", "").lower()]
return logs
def update_log(self, index, action=None, details=None):
"""Update a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
if action:
logs[index]["action"] = action
if details:
logs[index]["details"] = details
self._rewrite_logs_file(logs)
return True
return False
def delete_log(self, index):
"""Delete a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
logs.pop(index)
self._rewrite_logs_file(logs)
return True
return False
# ------------------- INTERNAL HELPER -------------------
def _rewrite_logs_file(self, logs):
"""Overwrite the log file with current logs."""
with open(self.filepath, "w", encoding="utf-8") as f:
for log in logs:
f.write(
f"Timestamp: {log['timestamp']} | "
f"User: {log['user']} | "
f"Action: {log['action']} | "
f"Details: {log['details']}\n"
)
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception as e:
print("Date filter error:", e)
#Why catching all exceptions? Need to handle specific exceptions
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

View File

@@ -1,14 +1,7 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
# return blocks
from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
import config
import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD
@@ -19,103 +12,162 @@ class Village:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.village = ItemCRUD(itemType=ItemCRUDType.Village)
# 🔹 Helper: sync status
def _set_status(self, village):
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
# 🔹 Helper: get request data
def _get_form_data(self, request):
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
return block_id, village_name
def AddVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
if not village_name:
self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
return
village.AddItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
#self.isSuccess = False
try:
self.village.AddItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlock",
storedprocadd="SaveVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllVillages(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagesdata = village.GetAllData(request=request, storedproc="GetAllVillages")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagesdata
try:
villagesdata = self.village.GetAllData(
request=request,
storedproc="GetAllVillages"
)
self._set_status(self.village)
return villagesdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return []
def CheckVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
result = village.CheckItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlocks")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return result
block_id, village_name = self._get_form_data(request)
if not village_name:
self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
return None
try:
result = self.village.CheckItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlocks"
)
self._set_status(self.village)
return result
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def DeleteVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
try:
self.village.DeleteItem(
request=request,
itemID=village_id,
storedprocDelete="DeleteVillage"
)
self._set_status(self.village)
village.DeleteItem(request=request, itemID=village_id, storedprocDelete="DeleteVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def EditVillage(self, request, village_id):
corsor=None
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
village.EditItem(request=request,childid=village_id,parentid=block_id,childname=village_name,storedprocupdate="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
# def GetVillageByID(self, request, id):
# village = ItemCRUD(itemType=ItemCRUDType.Village)
# villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
# self.isSuccess = village.isSuccess
# self.resultMessage = village.resultMessage
# return villagedetailsdata
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetDataByID(id=id,storedproc="GetVillageDetailsById")
if villagedetailsdata:
self.isSuccess = True
else:
if not village_name:
self.isSuccess = False
self.resultMessage = "Village not found"
self.resultMessage = "Village name cannot be empty"
return
return villagedetailsdata
try:
self.village.EditItem(
request=request,
childid=village_id,
parentid=block_id,
childname=village_name,
storedprocupdate="UpdateVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllBlocks(self, request):
def GetVillageByID(self, id):
try:
villagedetailsdata = self.village.GetDataByID(
id=id,
storedproc="GetVillageDetailsById"
)
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Village not found"
return villagedetailsdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def GetAllBlocks(self):
blocks = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks = result.fetchall()
with connection.cursor() as cursor:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks.extend(result.fetchall())
self.isSuccess = True
return blocks
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500)
finally:
cursor.close()
connection.close()
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure("block"), 500
)
return []
return blocks
finally:
connection.close()

View File

@@ -1,150 +1,246 @@
import config
import mysql.connector
# from flask import request
# from model.ItemCRUD import ItemCRUD
# from model.Utilities import ItemCRUDType
class GSTReleasemodel:
# class GSTRelease:
# """CRUD operations for GST Release using ItemCRUD"""
# def __init__(self):
# self.isSuccess = False
# self.resultMessage = ""
# # ------------------- Add GST Release -------------------
# def AddGSTRelease(self, request):
# pmc_no = request.form.get('PMC_No', '').strip()
# invoice_no = request.form.get('invoice_No', '').strip()
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.AddItem(
# request=request,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocfetch="CheckGSTReleaseExists",
# storedprocadd="AddGSTReleaseFromExcel" # your stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Get All GST Releases -------------------
# def GetAllGSTReleases(self):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# # Pass request=None for fetch
# rows = gst.GetAllData(request=None, storedproc="GetAllGSTReleases")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# data = []
# for row in rows:
# data.append({
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# })
# return data
# # ------------------- Get GST Release By ID -------------------
# def GetGSTReleaseByID(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# row = gst.GetDataByID(gst_release_id, request=None, storedproc="GetGSTReleaseById")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# if row:
# return {
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# }
# return None
# # ------------------- Edit GST Release -------------------
# def EditGSTRelease(self, request, gst_release_id):
# pmc_no = request.form.get('PMC_No', '').strip()
# invoice_no = request.form.get('invoice_No', '').strip()
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.EditItem(
# request=request,
# childid=gst_release_id,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocupdate="UpdateGSTRelease" # stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Delete GST Release -------------------
# def DeleteGSTRelease(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.DeleteItem(
# itemID=gst_release_id,
# request=None,
# storedprocDelete="DeleteGSTReleaseById"
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
from flask import request, jsonify
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GSTRelease:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add GST Release -------------------
def AddGSTRelease(self, request):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip(),
"Contractor_ID": int(request.form.get("Contractor_ID", 0) or 0)
}
gst.AddItem(
request=request,
data=data,
storedprocfetch="CheckGSTReleaseExists",
storedprocadd="AddGSTReleaseFromExcel"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in AddGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Edit GST Release -------------------
def EditGSTRelease(self, request, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip()
}
gst.EditItem(
request=request,
childid=gst_release_id,
data=data,
storedprocupdate="UpdateGSTRelease"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in EditGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Delete GST Release -------------------
def DeleteGSTRelease(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst.DeleteItem(
request=None,
itemID=gst_release_id,
storedprocDelete="DeleteGSTReleaseById"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in DeleteGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Get All GST Releases -------------------
def GetAllGSTReleases(self):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
rows = gst.GetAllData(None, "GetAllGSTReleases")
data = []
for row in rows:
data.append({
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
})
return data
except Exception as e:
print("ERROR in GetAllGSTReleases:", e)
return []
# ------------------- Get GST Release By ID -------------------
def GetGSTReleaseByID(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
row = gst.GetDataByID(gst_release_id, "GetGSTReleaseById")
if row:
return {
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
}
@staticmethod
def get_connection():
connection = config.get_db_connection()
if not connection:
return None
return connection
@staticmethod
def fetch_all_gst_releases():
connection = GSTReleasemodel.get_connection()
gst_releases = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetAllGSTReleases')
gst_releases = []
for result in cursor.stored_results(): # change to procedure
gst_releases = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching GST releases: {e}")
finally:
cursor.close()
connection.close()
return gst_releases
@staticmethod
def insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Insert into gst_release
cursor.callproc(
'InsertGSTReleaseOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
)
# Insert into inpayment
cursor.callproc(
'InsertInpaymentOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
)
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def fetch_gst_release_by_id(gst_release_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return None
data = {}
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetGSTReleaseById', [gst_release_id])
for result in cursor.stored_results():
data = result.fetchone()
if data:
# Convert to array for template
data = [
data.get('GST_Release_Id'),
data.get('PMC_No'),
data.get('Invoice_No'),
data.get('Basic_Amount'),
data.get('Final_Amount'),
data.get('Total_Amount'),
data.get('UTR')
]
except mysql.connector.Error as e:
print(f"Error fetching GST release by id: {e}")
finally:
cursor.close()
connection.close()
return data
@staticmethod
def update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr):
connection = GSTReleasemodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Update gst_release
cursor.callproc(
'UpdateGSTRelease',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id]
)
# Update inpayment
cursor.callproc(
'UpdateInpaymentByUTR',
[basic_amount, final_amount, total_amount, utr]
)
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_gst_release(gst_release_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False, None
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetGSTReleaseUTRById', [gst_release_id])
record = None
for result in cursor.stored_results():
record = result.fetchone()
if not record:
return False, None
utr = record['UTR']
# Step 1: Delete gst_release
cursor.callproc('DeleteGSTReleaseById', [gst_release_id])
# Step 2: Reset inpayment using UTR
cursor.callproc('ResetInpaymentByUTR', [utr])
connection.commit()
return True, utr
except mysql.connector.Error as e:
print(f"Error deleting GST release: {e}")
return False, None
finally:
cursor.close()
connection.close()
except Exception as e:
print("ERROR in GetGSTReleaseByID:", e)
return None

View File

@@ -1,8 +1,13 @@
import config
import mysql.connector
import config
import mysql.connector
from enum import Enum
from model.Utilities import ItemCRUDType
class Paymentmodel:
# ---------------- Database Connection ----------------
@staticmethod
def get_connection():
connection = config.get_db_connection()
@@ -10,6 +15,7 @@ class Paymentmodel:
return None
return connection
# ---------------- Payment Methods ----------------
@staticmethod
def fetch_all_payments():
connection = Paymentmodel.get_connection()
@@ -52,14 +58,8 @@ class Paymentmodel:
try:
cursor = connection.cursor()
cursor.callproc('UpdateInpaymentRecord', [
subcontractor_id,
pmc_no,
invoice_no,
amount,
tds_amount,
total_amount,
utr
])
subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr
])
connection.commit()
return True
except mysql.connector.Error as e:
@@ -80,7 +80,6 @@ class Paymentmodel:
cursor.callproc("GetPaymentById", (payment_id,))
for result in cursor.stored_results():
payment_data = result.fetchone()
# Convert to array for template
if payment_data:
payment_data = [
payment_data.get('Payment_Id'),
@@ -117,42 +116,99 @@ class Paymentmodel:
@staticmethod
def delete_payment(payment_id):
"""
Deletes a payment and resets the related inpayment fields in one go.
Returns (success, pmc_no, invoice_no)
"""
connection = Paymentmodel.get_connection()
if not connection:
return False, None, None
try:
cursor = connection.cursor(dictionary=True)
# Fetch PMC & Invoice before deleting
cursor.callproc('GetPaymentPMCInvoiceById', [payment_id])
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
return False, None, None
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Step 2: Delete the payment using the stored procedure
# Delete payment
cursor.callproc("DeletePayment", (payment_id,))
connection.commit()
# Step 3: Reset inpayment fields using the stored procedure
# Reset inpayment fields
cursor.callproc("ResetInpayment", [pmc_no, invoice_no])
connection.commit()
return True, pmc_no, invoice_no
except mysql.connector.Error as e:
print(f"Error deleting payment: {e}")
return False, None, None
finally:
cursor.close()
connection.close()
# ---------------- Item CRUD Methods ----------------
@staticmethod
def fetch_items(item_type: ItemCRUDType):
connection = Paymentmodel.get_connection()
items = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetItemsByType', [item_type.value])
for result in cursor.stored_results():
items = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching {item_type.name}: {e}")
finally:
cursor.close()
connection.close()
return items
@staticmethod
def insert_item(item_type: ItemCRUDType, name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('InsertItem', [item_type.value, name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def update_item(item_type: ItemCRUDType, item_id: int, new_name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('UpdateItem', [item_type.value, item_id, new_name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_item(item_type: ItemCRUDType, item_id: int):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('DeleteItem', [item_type.value, item_id])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error deleting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()

View File

@@ -1,5 +1,6 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Activity Logs</title>
@@ -9,67 +10,79 @@
background-color: #f8f9fa;
margin: 20px;
}
h2 {
text-align: center;
margin-bottom: 20px;
}
form {
display: flex;
gap: 10px;
justify-content: center;
margin-bottom: 20px;
flex-wrap: wrap;
}
input, button {
input,
button {
padding: 8px;
border-radius: 6px;
border: 1px solid #ccc;
}
button {
background-color: #007bff;
color: white;
cursor: pointer;
}
button:hover {
background-color: #0056b3;
}
table {
width: 90%;
margin: auto;
border-collapse: collapse;
background: white;
box-shadow: 0 0 8px rgba(0,0,0,0.1);
box-shadow: 0 0 8px rgba(0, 0, 0, 0.1);
}
th, td {
th,
td {
padding: 12px;
border: 1px solid #ddd;
text-align: center;
}
th {
background-color: #007bff;
color: white;
}
tr:nth-child(even) {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<h2>Activity Logs</h2>
<form method="get" action="{{ url_for('activity_log') }}" class="filter-form">
<label for="username">Username:</label>
<input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}">
<form method="get" action="{{ url_for('log.activity_log') }}" class="filter-form">
<label for="username">Username:</label>
<input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}">
<label for="start_date">Start Date:</label>
<input type="date" id="start_date" name="start_date" value="{{ start_date or '' }}">
<label for="start_date">Start Date:</label>
<input type="date" id="start_date" name="start_date" value="{{ start_date or '' }}">
<label for="end_date">End Date:</label>
<input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}">
<label for="end_date">End Date:</label>
<input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}">
<button type="submit" class="btn btn-primary">Filter</button>
<!-- <button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button> -->
</form>
<button type="submit" class="btn btn-primary">Filter</button>
<button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button>
</form>
<table>
<tr>
@@ -94,9 +107,10 @@
</table>
<script>
function resetFilter() {
window.location.href = "{{ url_for('activity_log') }}";
}
</script>
function resetFilter() {
window.location.href = "{{ url_for('log.activity_log') }}";
}
</script>
</body>
</html>
</html>