6 Commits

16 changed files with 891 additions and 896 deletions

1
.gitignore vendored
View File

@@ -4,4 +4,3 @@ __pycache__/
static/downloads/ static/downloads/
static/uploads/ static/uploads/

View File

@@ -2,35 +2,27 @@ from flask import Blueprint, render_template, request, redirect, url_for, jsonif
from flask_login import login_required from flask_login import login_required
import config import config
from model.State import State
from model.Block import Block from model.Block import Block
from model.Utilities import HtmlHelper from model.Utilities import HtmlHelper
block_bp = Blueprint('block', __name__) block_bp = Blueprint('block', __name__)
# # --- Add Block page -------
@block_bp.route('/add_block', methods=['GET', 'POST']) @block_bp.route('/add_block', methods=['GET', 'POST'])
@login_required @login_required
def add_block(): def add_block():
block = Block() block = Block()
if request.method == 'POST': if request.method == 'POST':
block.AddBlock(request) block.AddBlock(request)
return block.resultMessage return block.resultMessage
connection = config.get_db_connection() state = State()
cursor = connection.cursor() states = state.GetAllStates(request=request)
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
block_data = block.GetAllBlocks(request) block_data = block.GetAllBlocks(request)
cursor.close()
connection.close()
return render_template( return render_template(
'add_block.html', 'add_block.html',
states=states, states=states,

View File

@@ -27,8 +27,6 @@ def upload():
f"User {current_user.id} Upload Excel File '{file.filename}'" f"User {current_user.id} Upload Excel File '{file.filename}'"
) )
return redirect(url_for('excel.show_table', filename=file.filename)) return redirect(url_for('excel.show_table', filename=file.filename))
else:
return redirect(url_for('upload_excel_file'))
return render_template('uploadExcelFile.html') return render_template('uploadExcelFile.html')

View File

@@ -1,70 +1,46 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required
from model.gst_release import GSTReleasemodel from model.gst_release import GSTRelease
from model.Log import LogHelper from model.Log import LogHelper
from flask import flash, current_app
gst_release_bp = Blueprint('gst_release_bp', __name__) gst_release_bp = Blueprint('gst_release_bp', __name__)
gst_service = GSTRelease()
# ------------------- Add GST Release ------------------- # ---------------- ADD GST RELEASE ----------------
@gst_release_bp.route('/add_gst_release', methods=['GET', 'POST']) @gst_release_bp.route('/add_gst_release', methods=['GET', 'POST'])
@login_required @login_required
def add_gst_release(): def add_gst_release():
gst_releases_dict = GSTReleasemodel.fetch_all_gst_releases()
gst_releases = [
[
g['GST_Release_Id'], g['PMC_No'], g['invoice_no'],
g['Basic_Amount'], g['Final_Amount'], g['Total_Amount'], g['UTR']
] for g in gst_releases_dict
] if gst_releases_dict else []
if request.method == 'POST': if request.method == 'POST':
pmc_no = request.form['PMC_No'] gst_service.AddGSTRelease(request)
invoice_no = request.form['invoice_No'] LogHelper.log_action("Add GST Release", f"User added GST release")
basic_amount = request.form['basic_amount'] flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
contractor_id = request.form['subcontractor_id']
LogHelper.log_action("Add GST Release", f"User {current_user.id} added GST release '{pmc_no}'")
GSTReleasemodel.insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id)
return redirect(url_for('gst_release_bp.add_gst_release')) return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('add_gst_release.html', gst_releases=gst_releases, invoices=[]) gst_releases = gst_service.GetAllGSTReleases()
return render_template('add_gst_release.html', gst_releases=gst_releases)
# ---------------- EDIT GST RELEASE ----------------
# ------------------- Edit GST Release -------------------
@gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST']) @gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required @login_required
def edit_gst_release(gst_release_id): def edit_gst_release(gst_release_id):
gst_release_data = GSTReleasemodel.fetch_gst_release_by_id(gst_release_id) gst_data = gst_service.GetGSTReleaseByID(gst_release_id)
if not gst_release_data: if not gst_data:
return "GST Release not found", 404 return "GST Release not found", 404
if request.method == 'POST': if request.method == 'POST':
pmc_no = request.form['PMC_No'] gst_service.EditGSTRelease(request, gst_release_id)
invoice_no = request.form['invoice_No'] LogHelper.log_action("Edit GST Release", f"User edited GST release")
basic_amount = request.form['basic_amount'] flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit GST Release", f"User {current_user.id} edited GST release '{pmc_no}'")
GSTReleasemodel.update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr)
return redirect(url_for('gst_release_bp.add_gst_release')) return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=[]) return render_template('edit_gst_release.html', gst_release_data=gst_data)
# ---------------- DELETE GST RELEASE ----------------
# ------------------- Delete GST Release -------------------
@gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST']) @gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required @login_required
def delete_gst_release(gst_release_id): def delete_gst_release(gst_release_id):
success, utr = GSTReleasemodel.delete_gst_release(gst_release_id) gst_service.DeleteGSTRelease(gst_release_id) # remove request
if not success: LogHelper.log_action("Delete GST Release", f"User deleted GST release")
return jsonify({"message": "GST Release not found or failed to delete", "status": "error"}), 404 flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
return redirect(url_for('gst_release_bp.add_gst_release'))
LogHelper.log_action("Delete GST Release", f"User {current_user.id} deleted GST release '{gst_release_id}'")
return jsonify({"message": f"GST Release {gst_release_id} deleted successfully.", "status": "success"}), 200

View File

@@ -1,3 +1,6 @@
# controllers/invoice_controller.py # controllers/invoice_controller.py
from flask import Blueprint, request, jsonify, render_template from flask import Blueprint, request, jsonify, render_template
@@ -7,92 +10,92 @@ from model.Log import LogHelper
invoice_bp = Blueprint('invoice', __name__) invoice_bp = Blueprint('invoice', __name__)
# -------------------------------- Add Invoice --------------------------------- # ------------------------------- Helpers -------------------------------
def handle_exception(func):
"""Decorator to handle exceptions and return JSON error responses."""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
wrapper.__name__ = func.__name__
return wrapper
def log_action(action: str, detail: str):
LogHelper.log_action(action, f"User {current_user.id} {detail}")
# ------------------------------- Add Invoice -------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST']) @invoice_bp.route('/add_invoice', methods=['GET', 'POST'])
@login_required @login_required
@handle_exception
def add_invoice(): def add_invoice():
if request.method == 'POST': if request.method == 'POST':
try: data = request.form
village_name = request.form.get('village') village_name = data.get('village')
village_result = get_village_id(village_name) village_result = get_village_id(village_name)
if not village_result: if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400 return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id'] village_id = village_result['Village_Id']
data = request.form
invoice_id = insert_invoice(data, village_id) invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id) assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id) insert_hold_types(data, invoice_id)
LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{data.get('pmc_no')}'") log_action("Add invoice", f"added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201 return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
invoices = get_all_invoice_details() invoices = get_all_invoice_details()
villages = get_all_villages() villages = get_all_villages()
return render_template('add_invoice.html', invoices=invoices, villages=villages) return render_template('add_invoice.html', invoices=invoices, villages=villages)
# ------------------- Search Subcontractor ------------------- # ------------------------------- Search Subcontractor -------------------------------
@invoice_bp.route('/search_subcontractor', methods=['POST']) @invoice_bp.route('/search_subcontractor', methods=['POST'])
@login_required @login_required
@handle_exception
def search_subcontractor(): def search_subcontractor():
sub_query = request.form.get("query") query = request.form.get("query", "").strip()
results = search_contractors(sub_query) results = search_contractors(query)
if not results: if not results:
return "<li>No subcontractor found</li>" return "<li>No subcontractor found</li>"
output = "".join( return "".join(f"<li data-id='{r['Contractor_Id']}'>{r['Contractor_Name']}</li>" for r in results)
f"<li data-id='{row['Contractor_Id']}'>{row['Contractor_Name']}</li>"
for row in results
)
return output
# ------------------- Get Hold Types ------------------- # ------------------------------- Get Hold Types -------------------------------
@invoice_bp.route('/get_hold_types', methods=['GET']) @invoice_bp.route('/get_hold_types', methods=['GET'])
@login_required @login_required
@handle_exception
def get_hold_types(): def get_hold_types():
hold_types = get_all_hold_types() hold_types = get_all_hold_types()
LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type '{hold_types}'") log_action("Get hold type", f"retrieved hold types '{hold_types}'")
return jsonify(hold_types) return jsonify(hold_types)
# ------------------- Edit Invoice ------------------- # ------------------------------- Edit Invoice -------------------------------
@invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST']) @invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST'])
@login_required @login_required
@handle_exception
def edit_invoice(invoice_id): def edit_invoice(invoice_id):
if request.method == 'POST': if request.method == 'POST':
data = request.form data = request.form
update_invoice(data, invoice_id) update_invoice(data, invoice_id)
update_inpayment(data) update_inpayment(data)
log_action("Edit invoice", f"edited invoice '{invoice_id}'")
LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice '{invoice_id}'")
return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200 return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200
invoice = get_invoice_by_id(invoice_id) invoice = get_invoice_by_id(invoice_id)
return render_template('edit_invoice.html', invoice=invoice) return render_template('edit_invoice.html', invoice=invoice)
# ------------------- Delete Invoice ------------------- # ------------------------------- Delete Invoice -------------------------------
@invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET']) @invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET'])
@login_required @login_required
@handle_exception
def delete_invoice_route(invoice_id): def delete_invoice_route(invoice_id):
try:
delete_invoice_data(invoice_id, current_user.id) delete_invoice_data(invoice_id, current_user.id)
LogHelper.log_action("Delete Invoice", f"User {current_user.id} deleted Invoice '{invoice_id}'") log_action("Delete invoice", f"deleted invoice '{invoice_id}'")
return jsonify({ return jsonify({"status": "success", "message": f"Invoice {invoice_id} deleted successfully."})
"message": f"Invoice {invoice_id} deleted successfully.",
"status": "success"
})
except Exception as e:
return jsonify({
"message": str(e),
"status": "error"
}), 500

View File

@@ -1,4 +1,4 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for, jsonify, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
from model.payment import Paymentmodel from model.payment import Paymentmodel
from model.Log import LogHelper from model.Log import LogHelper
@@ -88,16 +88,14 @@ def edit_payment(payment_id):
# ------------------- Delete Payment ------------------- # ------------------- Delete Payment -------------------
@payment_bp.route('/delete_payment/<int:payment_id>', methods=['GET', 'POST']) @payment_bp.route('/delete_payment/<int:payment_id>', methods=['POST'])
@login_required @login_required
def delete_payment(payment_id): def delete_payment(payment_id):
success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id) success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id)
if not success: if not success:
return jsonify({"message": "Payment not found or failed to delete", "status": "error"}), 404 flash("Payment not found or failed to delete", "error")
else:
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'") LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'")
flash(f"Payment ID {payment_id} deleted successfully.", "success")
return jsonify({ return redirect(url_for('payment_bp.add_payment'))
"message": f"Payment ID {payment_id} deleted successfully.",
"status": "success"
}), 200

View File

@@ -1,34 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required from flask_login import login_required
from model.Subcontractor import Subcontractor from model.Subcontractor import Subcontractor
from model.Utilities import HtmlHelper, ResponseHandler
subcontractor_bp = Blueprint('subcontractor', __name__) subcontractor_bp = Blueprint('subcontractor', __name__)
# ----------------------------------------------------------
# Helpers (unchanged)
# ----------------------------------------------------------
class HtmlHelper:
@staticmethod
def json_response(data, status=200):
return jsonify(data), status
class ResponseHandler:
@staticmethod
def fetch_failure(entity):
return {"status": "error", "message": f"Failed to fetch {entity}"}
@staticmethod
def add_failure(entity):
return {"status": "error", "message": f"Failed to add {entity}"}
@staticmethod
def update_failure(entity):
return {"status": "error", "message": f"Failed to update {entity}"}
@staticmethod
def delete_failure(entity):
return {"status": "error", "message": f"Failed to delete {entity}"}
# ---------------------------------------------------------- # ----------------------------------------------------------
# LIST + ADD # LIST + ADD

View File

@@ -1,8 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required from flask_login import login_required
import config import config
from model.Village import Village from model.Village import Village
from model.State import State from model.State import State
@@ -23,7 +26,6 @@ def add_village():
state = State() state = State()
states = state.GetAllStates(request=request) states = state.GetAllStates(request=request)
villages = village.GetAllVillages(request=request) villages = village.GetAllVillages(request=request)
return render_template( return render_template(
@@ -42,7 +44,6 @@ def get_districts(state_id):
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc("GetDistrictByStateID", [state_id]) cursor.callproc("GetDistrictByStateID", [state_id])
districts = [] districts = []
for rs in cursor.stored_results(): for rs in cursor.stored_results():
@@ -51,15 +52,7 @@ def get_districts(state_id):
cursor.close() cursor.close()
connection.close() connection.close()
district_list = [] return jsonify([{"id": d[0], "name": d[1]} for d in districts])
for d in districts:
district_list.append({
"id": d[0],
"name": d[1]
})
return jsonify(district_list)
# ------------------------- Fetch Blocks ------------------------- # ------------------------- Fetch Blocks -------------------------
@@ -71,7 +64,6 @@ def get_blocks(district_id):
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc("GetBlocksByDistrictID", [district_id]) cursor.callproc("GetBlocksByDistrictID", [district_id])
blocks = [] blocks = []
for rs in cursor.stored_results(): for rs in cursor.stored_results():
@@ -80,22 +72,13 @@ def get_blocks(district_id):
cursor.close() cursor.close()
connection.close() connection.close()
block_list = [] return jsonify([{"id": b[0], "name": b[1]} for b in blocks])
for b in blocks:
block_list.append({
"id": b[0],
"name": b[1]
})
return jsonify(block_list)
# ------------------------- Check Village ------------------------- # ------------------------- Check Village -------------------------
@village_bp.route('/check_village', methods=['POST']) @village_bp.route('/check_village', methods=['POST'])
@login_required @login_required
def check_village(): def check_village():
village = Village() village = Village()
return village.CheckVillage(request=request) return village.CheckVillage(request=request)
@@ -106,14 +89,9 @@ def check_village():
def delete_village(village_id): def delete_village(village_id):
village = Village() village = Village()
village.DeleteVillage(request=request, village_id=village_id) village.DeleteVillage(request=request, village_id=village_id)
if not village.isSuccess: flash(village.resultMessage, "success" if village.isSuccess else "error")
flash(village.resultMessage, "error")
else:
flash(village.resultMessage, "success")
return redirect(url_for('village.add_village')) return redirect(url_for('village.add_village'))
@@ -135,8 +113,8 @@ def edit_village(village_id):
else: else:
flash(village.resultMessage, "error") flash(village.resultMessage, "error")
village_data = village.GetVillageByID(request=request, id=village_id) village_data = village.GetVillageByID(id=village_id) or []
blocks = village.GetAllBlocks(request=request) blocks = village.GetAllBlocks() or []
return render_template( return render_template(
'edit_village.html', 'edit_village.html',
@@ -145,23 +123,17 @@ def edit_village(village_id):
) )
else: else:
# ✅ FIXED HERE (removed request)
village_data = village.GetVillageByID(request=request, id=village_id) village_data = village.GetVillageByID(id=village_id)
if not village.isSuccess: if not village.isSuccess:
flash(village.resultMessage, "error") flash(village.resultMessage, "error")
return redirect(url_for('village.add_village')) return redirect(url_for('village.add_village'))
blocks = village.GetAllBlocks(request=request) blocks = village.GetAllBlocks() or []
if village_data is None:
village_data = []
if blocks is None:
blocks = []
return render_template( return render_template(
'edit_village.html', 'edit_village.html',
village_data=village_data, village_data=village_data or [],
blocks=blocks blocks=blocks
) )

View File

@@ -1,72 +1,65 @@
import mysql.connector
from mysql.connector import Error from mysql.connector import Error
import config import config
import openpyxl
import os
import re
import ast
from datetime import datetime from datetime import datetime
class ContractorInfo: class ContractorInfo:
ID = "" def __init__(self, contractor_id):
contInfo = None self.ID = contractor_id
def __init__(self, id): self.contInfo = None
self.ID = id
print(id)
self.fetchData() self.fetchData()
def fetchData(self): def fetchData(self):
"""Fetch basic contractor info by ID."""
try: try:
connection = config.get_db_connection() connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True) with connection.cursor(dictionary=True, buffered=True) as cursor:
cursor.callproc('GetContractorInfoById', [self.ID]) cursor.callproc('GetContractorInfoById', [self.ID])
# Get the first result set
for result in cursor.stored_results(): for result in cursor.stored_results():
self.contInfo = result.fetchone() self.contInfo = result.fetchone()
except Error as e:
print(self.contInfo,flush=True) print(f"Error fetching contractor info: {e}")
finally: finally:
cursor.close() if connection.is_connected():
connection.close() connection.close()
def fetchalldata(self): def fetchalldata(self):
"""Fetch hold types and invoices for contractor."""
data = {}
try: try:
connection = config.get_db_connection() connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True) with connection.cursor(dictionary=True, buffered=True) as cursor:
print("here", flush=True) # Fetch Hold Types
# ---------------- Hold Types ----------------
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetHoldTypesByContractor', [self.ID]) cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = [] hold_types = []
for result in cursor.stored_results(): for result in cursor.stored_results():
hold_types = result.fetchall() hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
data['hold_types'] = hold_type_map
# ---------------- Invoices ---------------- # Fetch Invoices
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoicesByContractor', [self.ID]) cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = [] invoices = []
for result in cursor.stored_results(): for result in cursor.stored_results():
invoices = result.fetchall() invoices = result.fetchall()
# Remove duplicate invoices # Remove duplicate invoices
invoice_ids_seen = set() seen_ids = set()
unique_invoices = [] unique_invoices = []
for inv in invoices: for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen: if inv['Invoice_Id'] not in seen_ids:
invoice_ids_seen.add(inv["Invoice_Id"]) seen_ids.add(inv['Invoice_Id'])
unique_invoices.append(inv) unique_invoices.append(inv)
invoices = unique_invoices data['invoices'] = unique_invoices
except Error as e:
print(f"Error fetching contractor data: {e}")
finally: finally:
cursor.close() if connection.is_connected():
connection.close() connection.close()
return data

View File

@@ -1,55 +1,51 @@
import config from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GST: class GST:
@staticmethod @staticmethod
def get_unreleased_gst(): def get_unreleased_gst():
# Use ItemCRUD for Invoices
invoice_crud = ItemCRUD(itemType=ItemCRUDType.Invoice)
invoices_rows = invoice_crud.GetAllData(storedproc="GetAllInvoicesBasic")
connection = config.get_db_connection() if not invoice_crud.isSuccess:
cursor = connection.cursor(dictionary=True) return [] # Could also log invoice_crud.resultMessage
try: invoices = [
# ----------- Invoices ----------- dict(
cursor.callproc('GetAllInvoicesBasic') Invoice_No=row[1],
invoices = [] GST_SD_Amount=float(row[2]) if row[2] is not None else 0
for result in cursor.stored_results(): )
invoices = result.fetchall() for row in invoices_rows
]
# Use ItemCRUD for GST Releases
gst_crud = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst_rows = gst_crud.GetAllData(storedproc="GetAllGSTReleasesBasic")
# ----------- GST Releases ----------- if not gst_crud.isSuccess:
cursor.callproc('GetAllGSTReleasesBasic') return [] # Could also log gst_crud.resultMessage
gst_releases = []
for result in cursor.stored_results():
gst_releases = result.fetchall()
gst_invoice_nos = { gst_invoice_nos = {
g['Invoice_No'] g[2] # Invoice_No is at index 2
for g in gst_releases for g in gst_rows
if g['Invoice_No'] if g[2]
} }
gst_basic_amounts = { gst_basic_amounts = {
float(g['Basic_Amount']) float(g[3]) # Basic_Amount at index 3
for g in gst_releases for g in gst_rows
if g['Basic_Amount'] is not None if g[3] is not None
} }
# Filter unreleased invoices
unreleased = [] unreleased = []
for inv in invoices: for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = inv['GST_SD_Amount'] in gst_basic_amounts
match_by_gst_amount = float(
inv.get('GST_SD_Amount') or 0
) in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount): if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv) unreleased.append(inv)
return unreleased return unreleased
finally:
cursor.close()
connection.close()

View File

@@ -1,274 +1,29 @@
import config import config
import mysql.connector import mysql.connector
# ------------------- Helper ------------------- # ------------------- Helper Functions -------------------
def clear_results(cursor): def clear_results(cursor):
"""Consume all stored results to prevent cursor issues."""
for r in cursor.stored_results(): for r in cursor.stored_results():
r.fetchall() r.fetchall()
def fetch_one(cursor):
# ------------------- Get Village Id ------------------- """Fetch first row from stored results."""
def get_village_id(village_name):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetVillageIdByName", (village_name,))
village_result = None
for rs in cursor.stored_results():
village_result = rs.fetchone()
cursor.close()
connection.close()
return village_result
# ------------------- Insert Invoice -------------------
def insert_invoice(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# 1. Insert Invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0)
])
invoice_id = None
for result in cursor.stored_results(): for result in cursor.stored_results():
row = result.fetchone() return result.fetchone()
if row: return None
invoice_id = row.get('invoice_id')
if not invoice_id:
raise Exception("Invoice ID not returned")
# 2. Insert Inpayment
cursor.callproc('InsertInpayment', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
data.get('subcontractor_id')
])
clear_results(cursor)
connection.commit()
return invoice_id
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Assign Subcontractor -------------------
def assign_subcontractor(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Insert Hold Types -------------------
def insert_hold_types(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = None
def fetch_all(cursor):
"""Fetch all rows from stored results."""
data = []
for result in cursor.stored_results(): for result in cursor.stored_results():
hold_type_result = result.fetchone() data.extend(result.fetchall())
return data
if not hold_type_result: def get_numeric_values(data):
cursor.callproc('InsertHoldType', [hold_type, 0]) """Return numeric fields for invoices safely."""
cursor.execute("SELECT @_InsertHoldType_1") return [
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
hold_amount = float(hold_amount or 0)
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
hold_amount
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Get All Invoices -------------------
def get_all_invoice_details():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetAllInvoiceDetails')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.close()
connection.close()
return invoices
# ------------------- Get All Villages -------------------
def get_all_villages():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllVillages")
villages = []
for result in cursor.stored_results():
villages = result.fetchall()
cursor.close()
connection.close()
return villages
# ------------------- Search Contractors -------------------
def search_contractors(sub_query):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('SearchContractorsByName', [sub_query])
results = []
for result in cursor.stored_results():
results = result.fetchall()
cursor.close()
connection.close()
return results
# ------------------- Get All Hold Types -------------------
def get_all_hold_types():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllHoldTypes")
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
cursor.close()
connection.close()
return hold_types
# ------------------- Get Invoice By Id -------------------
def get_invoice_by_id(invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = None
for result in cursor.stored_results():
invoice = result.fetchone()
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = []
for result in cursor.stored_results():
hold_amounts = result.fetchall()
if invoice:
invoice["hold_amounts"] = hold_amounts
cursor.close()
connection.close()
return invoice
# ------------------- Update Invoice -------------------
def update_invoice(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = None
for rs in cursor.stored_results():
village = rs.fetchone()
village_id = village['Village_Id']
numeric = [
float(data.get('basic_amount') or 0), float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0), float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0), float(data.get('after_debit_amount') or 0),
@@ -282,6 +37,97 @@ def update_invoice(data, invoice_id):
float(data.get('final_amount') or 0), float(data.get('final_amount') or 0),
] ]
def execute_db_operation(operation_func):
"""General DB operation wrapper with commit/rollback."""
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
result = operation_func(cursor)
connection.commit()
return result
except Exception:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
# ------------------- Village Functions -------------------
def get_village_id(village_name):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (village_name,))
return fetch_one(cursor)
return execute_db_operation(operation)
def get_all_villages():
def operation(cursor):
cursor.callproc("GetAllVillages")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Invoice Functions -------------------
def insert_invoice(data, village_id):
def operation(cursor):
# Insert invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*get_numeric_values(data)
])
invoice_row = fetch_one(cursor)
if not invoice_row:
raise Exception("Invoice ID not returned")
invoice_id = invoice_row.get('invoice_id')
# Insert inpayment
cursor.callproc('InsertInpayment', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*get_numeric_values(data),
data.get('subcontractor_id')
])
clear_results(cursor)
return invoice_id
return execute_db_operation(operation)
def get_all_invoice_details():
def operation(cursor):
cursor.callproc('GetAllInvoiceDetails')
return fetch_all(cursor)
return execute_db_operation(operation)
def get_invoice_by_id(invoice_id):
def operation(cursor):
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = fetch_one(cursor)
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = fetch_all(cursor)
if invoice:
invoice["hold_amounts"] = hold_amounts
return invoice
return execute_db_operation(operation)
def update_invoice(data, invoice_id):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = fetch_one(cursor)
if not village:
raise Exception("Village not found")
village_id = village['Village_Id']
cursor.callproc('UpdateInvoice', [ cursor.callproc('UpdateInvoice', [
data.get('pmc_no'), data.get('pmc_no'),
village_id, village_id,
@@ -289,91 +135,101 @@ def update_invoice(data, invoice_id):
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
data.get('invoice_no'), data.get('invoice_no'),
*numeric, *get_numeric_values(data),
invoice_id invoice_id
]) ])
clear_results(cursor) clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Update Inpayment -------------------
def update_inpayment(data): def update_inpayment(data):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True)
try:
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInpayment', [ cursor.callproc('UpdateInpayment', [
data.get('work_type'), data.get('work_type'),
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
*numeric, *get_numeric_values(data),
data.get('pmc_no'), data.get('pmc_no'),
data.get('invoice_no') data.get('invoice_no')
]) ])
clear_results(cursor) clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Delete Invoice -------------------
def delete_invoice_data(invoice_id, user_id): def delete_invoice_data(invoice_id, user_id):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True) # Fetch PMC and Invoice_No from DB
cursor.callproc('GetInvoicePMCById', (invoice_id,))
try:
cursor.callproc('GetInvoicePMCById', [invoice_id])
record = {} record = {}
for result in cursor.stored_results(): for result in cursor.stored_results():
record = result.fetchone() or {} record = result.fetchone() or {}
if not record: if not record:
raise Exception("Invoice not found") raise Exception("Invoice not found")
# Use exact DB keys
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete invoice
cursor.callproc("DeleteInvoice", (invoice_id,)) cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor) clear_results(cursor)
cursor.callproc( # Delete inpayment
'DeleteInpaymentByPMCInvoice', cursor.callproc('DeleteInpaymentByPMCInvoice', (pmc_no, invoice_no))
[record['PMC_No'], record['invoice_no']] clear_results(cursor)
)
connection.commit() execute_db_operation(operation)
except Exception as e:
connection.rollback()
raise e
finally: # ------------------- Subcontractor Functions -------------------
cursor.close() def assign_subcontractor(data, village_id):
connection.close() def operation(cursor):
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
execute_db_operation(operation)
# ------------------- Hold Types Functions -------------------
def insert_hold_types(data, invoice_id):
def operation(cursor):
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = fetch_one(cursor)
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
float(hold_amount or 0)
])
clear_results(cursor)
execute_db_operation(operation)
def get_all_hold_types():
def operation(cursor):
cursor.callproc("GetAllHoldTypes")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Contractor Functions -------------------
def search_contractors(sub_query):
def operation(cursor):
cursor.callproc('SearchContractorsByName', [sub_query])
return fetch_all(cursor)
return execute_db_operation(operation)

View File

@@ -1,39 +1,29 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os import os
from datetime import datetime
from flask import current_app
from flask_login import current_user
class LogHelper: class LogHelper:
@staticmethod @staticmethod
def log_action(action, details=""): def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details.""" """Add a log entry."""
logData = LogData() log_data = LogData()
logData.WriteLog(action, details="") log_data.add_log(action, details)
class LogData: class LogData:
filepath = ""
timestamp = None
def __init__(self): def __init__(self):
self.filepath = os.path.join(current_app.root_path, 'activity.log') self.filepath = os.path.join(current_app.root_path, 'activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.user = getattr(current_user, "cn", None) \
or getattr(current_user, "username", None) \
or getattr(current_user, "sAMAccountName", "Unknown")
if hasattr(current_user, "cn") and current_user.cn:
self.user = current_user.cn
elif hasattr(current_user, "username") and current_user.username:
self.user = current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
self.user = current_user.sAMAccountName
else:
self.user = "Unknown"
def WriteLog(self, action, details=""):
"""Log user actions with timestamp, user, action, and details."""
def add_log(self, action, details=""):
"""Create/Add a log entry."""
with open(self.filepath, "a", encoding="utf-8") as f: with open(self.filepath, "a", encoding="utf-8") as f:
f.write( f.write(
f"Timestamp: {self.timestamp} | " f"Timestamp: {self.timestamp} | "
@@ -42,46 +32,73 @@ class LogData:
f"Details: {details}\n" f"Details: {details}\n"
) )
def get_all_logs(self):
def GetActivitiesLog(self): """Read all logs."""
logs = [] logs = []
if os.path.exists(self.filepath): if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f: with open(self.filepath, 'r', encoding="utf-8") as f:
for line in f: for line in f:
parts = line.strip().split(" | ") parts = line.strip().split(" | ")
if len(parts) == 4: if len(parts) == 4:
logs.append({ logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(), "timestamp": parts[0].split(":", 1)[1].strip(),
"user": parts[1].replace("User:", "").strip(), "user": parts[1].split(":", 1)[1].strip(),
"action": parts[2].replace("Action:", "").strip(), "action": parts[2].split(":", 1)[1].strip(),
"details": parts[3].replace("Details:", "").strip() "details": parts[3].split(":", 1)[1].strip()
}) })
return logs return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName): def get_filtered_logs(self, start_date=None, end_date=None, user_name=None):
"""Filter logs by date and/or user."""
logs = self.get_all_logs()
filtered_logs = self.GetActivitiesLog() # Filter by date
if start_date or end_date:
# Date filter start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min
if startDate or endDate: end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max
try: logs = [
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min log for log in logs
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
] ]
# Filter by username
if user_name:
logs = [log for log in logs if user_name.lower() in log.get("user", "").lower()]
except Exception as e: return logs
print("Date filter error:", e)
#Why catching all exceptions? Need to handle specific exceptions def update_log(self, index, action=None, details=None):
"""Update a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
if action:
logs[index]["action"] = action
if details:
logs[index]["details"] = details
self._rewrite_logs_file(logs)
return True
return False
def delete_log(self, index):
"""Delete a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
logs.pop(index)
self._rewrite_logs_file(logs)
return True
return False
# ------------------- INTERNAL HELPER -------------------
def _rewrite_logs_file(self, logs):
"""Overwrite the log file with current logs."""
with open(self.filepath, "w", encoding="utf-8") as f:
for log in logs:
f.write(
f"Timestamp: {log['timestamp']} | "
f"User: {log['user']} | "
f"Action: {log['action']} | "
f"Details: {log['details']}\n"
)
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

View File

@@ -1,14 +1,7 @@
# return blocks
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import config import config
import mysql.connector import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD from model.ItemCRUD import ItemCRUD
@@ -19,70 +12,123 @@ class Village:
def __init__(self): def __init__(self):
self.isSuccess = False self.isSuccess = False
self.resultMessage = "" self.resultMessage = ""
self.village = ItemCRUD(itemType=ItemCRUDType.Village)
# 🔹 Helper: sync status
def _set_status(self, village):
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
# 🔹 Helper: get request data
def _get_form_data(self, request):
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
return block_id, village_name
def AddVillage(self, request): def AddVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village) block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id') if not village_name:
village_name = request.form.get('Village_Name', '').strip() self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
village.AddItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return return
#self.isSuccess = False
try:
self.village.AddItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlock",
storedprocadd="SaveVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllVillages(self, request): def GetAllVillages(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagesdata = village.GetAllData(request=request, storedproc="GetAllVillages") try:
self.isSuccess = village.isSuccess villagesdata = self.village.GetAllData(
self.resultMessage = village.resultMessage request=request,
storedproc="GetAllVillages"
)
self._set_status(self.village)
return villagesdata return villagesdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return []
def CheckVillage(self, request): def CheckVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village) block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip() if not village_name:
result = village.CheckItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlocks") self.isSuccess = False
self.isSuccess = village.isSuccess self.resultMessage = "Village name cannot be empty"
self.resultMessage = village.resultMessage return None
try:
result = self.village.CheckItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlocks"
)
self._set_status(self.village)
return result return result
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def DeleteVillage(self, request, village_id): def DeleteVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village) try:
self.village.DeleteItem(
request=request,
itemID=village_id,
storedprocDelete="DeleteVillage"
)
self._set_status(self.village)
village.DeleteItem(request=request, itemID=village_id, storedprocDelete="DeleteVillage" ) except Exception as e:
self.isSuccess = village.isSuccess self.isSuccess = False
self.resultMessage = village.resultMessage self.resultMessage = str(e)
return
def EditVillage(self, request, village_id): def EditVillage(self, request, village_id):
corsor=None block_id, village_name = self._get_form_data(request)
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id') if not village_name:
village_name = request.form.get('Village_Name', '').strip() self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
village.EditItem(request=request,childid=village_id,parentid=block_id,childname=village_name,storedprocupdate="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return return
# def GetVillageByID(self, request, id): try:
self.village.EditItem(
request=request,
childid=village_id,
parentid=block_id,
childname=village_name,
storedprocupdate="UpdateVillage"
)
self._set_status(self.village)
# village = ItemCRUD(itemType=ItemCRUDType.Village) except Exception as e:
# villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById") self.isSuccess = False
# self.isSuccess = village.isSuccess self.resultMessage = str(e)
# self.resultMessage = village.resultMessage
# return villagedetailsdata def GetVillageByID(self, id):
try:
villagedetailsdata = self.village.GetDataByID(
id=id,
storedproc="GetVillageDetailsById"
)
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetDataByID(id=id,storedproc="GetVillageDetailsById")
if villagedetailsdata: if villagedetailsdata:
self.isSuccess = True self.isSuccess = True
else: else:
@@ -91,31 +137,37 @@ class Village:
return villagedetailsdata return villagedetailsdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def GetAllBlocks(self, request): def GetAllBlocks(self):
blocks = [] blocks = []
self.isSuccess = False self.isSuccess = False
self.resultMessage = "" self.resultMessage = ""
connection = config.get_db_connection()
connection = config.get_db_connection()
if not connection: if not connection:
return [] return []
cursor = connection.cursor()
try: try:
with connection.cursor() as cursor:
cursor.callproc('GetAllBlocks') cursor.callproc('GetAllBlocks')
for result in cursor.stored_results(): for result in cursor.stored_results():
blocks = result.fetchall() blocks.extend(result.fetchall())
self.isSuccess = True self.isSuccess = True
return blocks
except mysql.connector.Error as e: except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}") print(f"Error fetching blocks: {e}")
self.isSuccess = False self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500) self.resultMessage = HtmlHelper.json_response(
finally: ResponseHandler.fetch_failure("block"), 500
cursor.close() )
connection.close() return []
return blocks finally:
connection.close()

View File

@@ -1,150 +1,246 @@
import config # from flask import request
import mysql.connector # from model.ItemCRUD import ItemCRUD
# from model.Utilities import ItemCRUDType
class GSTReleasemodel: # class GSTRelease:
# """CRUD operations for GST Release using ItemCRUD"""
@staticmethod # def __init__(self):
def get_connection(): # self.isSuccess = False
connection = config.get_db_connection() # self.resultMessage = ""
if not connection:
return None
return connection
@staticmethod # # ------------------- Add GST Release -------------------
def fetch_all_gst_releases(): # def AddGSTRelease(self, request):
connection = GSTReleasemodel.get_connection() # pmc_no = request.form.get('PMC_No', '').strip()
gst_releases = [] # invoice_no = request.form.get('invoice_No', '').strip()
if connection:
cursor = connection.cursor(dictionary=True)
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.AddItem(
# request=request,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocfetch="CheckGSTReleaseExists",
# storedprocadd="AddGSTReleaseFromExcel" # your stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Get All GST Releases -------------------
# def GetAllGSTReleases(self):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# # Pass request=None for fetch
# rows = gst.GetAllData(request=None, storedproc="GetAllGSTReleases")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# data = []
# for row in rows:
# data.append({
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# })
# return data
# # ------------------- Get GST Release By ID -------------------
# def GetGSTReleaseByID(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# row = gst.GetDataByID(gst_release_id, request=None, storedproc="GetGSTReleaseById")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# if row:
# return {
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# }
# return None
# # ------------------- Edit GST Release -------------------
# def EditGSTRelease(self, request, gst_release_id):
# pmc_no = request.form.get('PMC_No', '').strip()
# invoice_no = request.form.get('invoice_No', '').strip()
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.EditItem(
# request=request,
# childid=gst_release_id,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocupdate="UpdateGSTRelease" # stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Delete GST Release -------------------
# def DeleteGSTRelease(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.DeleteItem(
# itemID=gst_release_id,
# request=None,
# storedprocDelete="DeleteGSTReleaseById"
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
from flask import request, jsonify
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GSTRelease:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add GST Release -------------------
def AddGSTRelease(self, request):
try: try:
cursor.callproc('GetAllGSTReleases') gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst_releases = [] data = {
for result in cursor.stored_results(): # change to procedure "PMC_No": request.form.get("PMC_No", "").strip(),
gst_releases = result.fetchall() "Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip(),
"Contractor_ID": int(request.form.get("Contractor_ID", 0) or 0)
}
except mysql.connector.Error as e: gst.AddItem(
print(f"Error fetching GST releases: {e}") request=request,
finally: data=data,
cursor.close() storedprocfetch="CheckGSTReleaseExists",
connection.close() storedprocadd="AddGSTReleaseFromExcel"
return gst_releases
@staticmethod
def insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Insert into gst_release
cursor.callproc(
'InsertGSTReleaseOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
) )
# Insert into inpayment self.isSuccess = gst.isSuccess
cursor.callproc( self.resultMessage = str(gst.resultMessage)
'InsertInpaymentOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id] except Exception as e:
print("ERROR in AddGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Edit GST Release -------------------
def EditGSTRelease(self, request, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip()
}
gst.EditItem(
request=request,
childid=gst_release_id,
data=data,
storedprocupdate="UpdateGSTRelease"
) )
connection.commit() self.isSuccess = gst.isSuccess
return True self.resultMessage = str(gst.resultMessage)
except mysql.connector.Error as e:
print(f"Error inserting GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod except Exception as e:
def fetch_gst_release_by_id(gst_release_id): print("ERROR in EditGSTRelease:", e)
connection = GSTReleasemodel.get_connection() self.isSuccess = False
if not connection: self.resultMessage = str(e)
return None
data = {} return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Delete GST Release -------------------
def DeleteGSTRelease(self, gst_release_id):
try: try:
cursor = connection.cursor(dictionary=True) gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
cursor.callproc('GetGSTReleaseById', [gst_release_id]) gst.DeleteItem(
request=None,
itemID=gst_release_id,
storedprocDelete="DeleteGSTReleaseById"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in DeleteGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Get All GST Releases -------------------
def GetAllGSTReleases(self):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
rows = gst.GetAllData(None, "GetAllGSTReleases")
data = []
for row in rows:
data.append({
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
})
for result in cursor.stored_results():
data = result.fetchone()
if data:
# Convert to array for template
data = [
data.get('GST_Release_Id'),
data.get('PMC_No'),
data.get('Invoice_No'),
data.get('Basic_Amount'),
data.get('Final_Amount'),
data.get('Total_Amount'),
data.get('UTR')
]
except mysql.connector.Error as e:
print(f"Error fetching GST release by id: {e}")
finally:
cursor.close()
connection.close()
return data return data
@staticmethod except Exception as e:
def update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr): print("ERROR in GetAllGSTReleases:", e)
connection = GSTReleasemodel.get_connection() return []
if not connection:
return False # ------------------- Get GST Release By ID -------------------
def GetGSTReleaseByID(self, gst_release_id):
try: try:
cursor = connection.cursor() gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# Update gst_release
cursor.callproc(
'UpdateGSTRelease',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id]
)
# Update inpayment
cursor.callproc(
'UpdateInpaymentByUTR',
[basic_amount, final_amount, total_amount, utr]
)
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod row = gst.GetDataByID(gst_release_id, "GetGSTReleaseById")
def delete_gst_release(gst_release_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False, None
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetGSTReleaseUTRById', [gst_release_id])
record = None
for result in cursor.stored_results():
record = result.fetchone()
if not record: if row:
return False, None return {
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
}
utr = record['UTR'] return None
# Step 1: Delete gst_release except Exception as e:
cursor.callproc('DeleteGSTReleaseById', [gst_release_id]) print("ERROR in GetGSTReleaseByID:", e)
return None
# Step 2: Reset inpayment using UTR
cursor.callproc('ResetInpaymentByUTR', [utr])
connection.commit()
return True, utr
except mysql.connector.Error as e:
print(f"Error deleting GST release: {e}")
return False, None
finally:
cursor.close()
connection.close()

View File

@@ -1,8 +1,13 @@
import config import config
import mysql.connector import mysql.connector
import config
import mysql.connector
from enum import Enum
from model.Utilities import ItemCRUDType
class Paymentmodel: class Paymentmodel:
# ---------------- Database Connection ----------------
@staticmethod @staticmethod
def get_connection(): def get_connection():
connection = config.get_db_connection() connection = config.get_db_connection()
@@ -10,6 +15,7 @@ class Paymentmodel:
return None return None
return connection return connection
# ---------------- Payment Methods ----------------
@staticmethod @staticmethod
def fetch_all_payments(): def fetch_all_payments():
connection = Paymentmodel.get_connection() connection = Paymentmodel.get_connection()
@@ -52,13 +58,7 @@ class Paymentmodel:
try: try:
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc('UpdateInpaymentRecord', [ cursor.callproc('UpdateInpaymentRecord', [
subcontractor_id, subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr
pmc_no,
invoice_no,
amount,
tds_amount,
total_amount,
utr
]) ])
connection.commit() connection.commit()
return True return True
@@ -80,7 +80,6 @@ class Paymentmodel:
cursor.callproc("GetPaymentById", (payment_id,)) cursor.callproc("GetPaymentById", (payment_id,))
for result in cursor.stored_results(): for result in cursor.stored_results():
payment_data = result.fetchone() payment_data = result.fetchone()
# Convert to array for template
if payment_data: if payment_data:
payment_data = [ payment_data = [
payment_data.get('Payment_Id'), payment_data.get('Payment_Id'),
@@ -117,42 +116,99 @@ class Paymentmodel:
@staticmethod @staticmethod
def delete_payment(payment_id): def delete_payment(payment_id):
"""
Deletes a payment and resets the related inpayment fields in one go.
Returns (success, pmc_no, invoice_no)
"""
connection = Paymentmodel.get_connection() connection = Paymentmodel.get_connection()
if not connection: if not connection:
return False, None, None return False, None, None
try: try:
cursor = connection.cursor(dictionary=True) cursor = connection.cursor(dictionary=True)
# Fetch PMC & Invoice before deleting
cursor.callproc('GetPaymentPMCInvoiceById', [payment_id]) cursor.callproc('GetPaymentPMCInvoiceById', [payment_id])
record = {} record = {}
for result in cursor.stored_results(): for result in cursor.stored_results():
record = result.fetchone() or {} record = result.fetchone() or {}
if not record: if not record:
return False, None, None return False, None, None
pmc_no = record['PMC_No'] pmc_no = record['PMC_No']
invoice_no = record['Invoice_No'] invoice_no = record['Invoice_No']
# Delete payment
# Step 2: Delete the payment using the stored procedure
cursor.callproc("DeletePayment", (payment_id,)) cursor.callproc("DeletePayment", (payment_id,))
connection.commit() connection.commit()
# Reset inpayment fields
# Step 3: Reset inpayment fields using the stored procedure
cursor.callproc("ResetInpayment", [pmc_no, invoice_no]) cursor.callproc("ResetInpayment", [pmc_no, invoice_no])
connection.commit() connection.commit()
return True, pmc_no, invoice_no return True, pmc_no, invoice_no
except mysql.connector.Error as e: except mysql.connector.Error as e:
print(f"Error deleting payment: {e}") print(f"Error deleting payment: {e}")
return False, None, None return False, None, None
finally:
cursor.close()
connection.close()
# ---------------- Item CRUD Methods ----------------
@staticmethod
def fetch_items(item_type: ItemCRUDType):
connection = Paymentmodel.get_connection()
items = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetItemsByType', [item_type.value])
for result in cursor.stored_results():
items = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching {item_type.name}: {e}")
finally:
cursor.close()
connection.close()
return items
@staticmethod
def insert_item(item_type: ItemCRUDType, name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('InsertItem', [item_type.value, name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def update_item(item_type: ItemCRUDType, item_id: int, new_name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('UpdateItem', [item_type.value, item_id, new_name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_item(item_type: ItemCRUDType, item_id: int):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('DeleteItem', [item_type.value, item_id])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error deleting {item_type.name}: {e}")
return False
finally: finally:
cursor.close() cursor.close()
connection.close() connection.close()

View File

@@ -1,5 +1,6 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<title>Activity Logs</title> <title>Activity Logs</title>
@@ -9,29 +10,37 @@
background-color: #f8f9fa; background-color: #f8f9fa;
margin: 20px; margin: 20px;
} }
h2 { h2 {
text-align: center; text-align: center;
margin-bottom: 20px; margin-bottom: 20px;
} }
form { form {
display: flex; display: flex;
gap: 10px; gap: 10px;
justify-content: center; justify-content: center;
margin-bottom: 20px; margin-bottom: 20px;
flex-wrap: wrap;
} }
input, button {
input,
button {
padding: 8px; padding: 8px;
border-radius: 6px; border-radius: 6px;
border: 1px solid #ccc; border: 1px solid #ccc;
} }
button { button {
background-color: #007bff; background-color: #007bff;
color: white; color: white;
cursor: pointer; cursor: pointer;
} }
button:hover { button:hover {
background-color: #0056b3; background-color: #0056b3;
} }
table { table {
width: 90%; width: 90%;
margin: auto; margin: auto;
@@ -39,24 +48,29 @@
background: white; background: white;
box-shadow: 0 0 8px rgba(0, 0, 0, 0.1); box-shadow: 0 0 8px rgba(0, 0, 0, 0.1);
} }
th, td {
th,
td {
padding: 12px; padding: 12px;
border: 1px solid #ddd; border: 1px solid #ddd;
text-align: center; text-align: center;
} }
th { th {
background-color: #007bff; background-color: #007bff;
color: white; color: white;
} }
tr:nth-child(even) { tr:nth-child(even) {
background-color: #f2f2f2; background-color: #f2f2f2;
} }
</style> </style>
</head> </head>
<body> <body>
<h2>Activity Logs</h2> <h2>Activity Logs</h2>
<form method="get" action="{{ url_for('activity_log') }}" class="filter-form">
<form method="get" action="{{ url_for('log.activity_log') }}" class="filter-form">
<label for="username">Username:</label> <label for="username">Username:</label>
<input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}"> <input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}">
@@ -66,9 +80,8 @@
<label for="end_date">End Date:</label> <label for="end_date">End Date:</label>
<input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}"> <input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}">
<button type="submit" class="btn btn-primary">Filter</button> <button type="submit" class="btn btn-primary">Filter</button>
<!-- <button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button> --> <button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button>
</form> </form>
<table> <table>
@@ -95,8 +108,9 @@
<script> <script>
function resetFilter() { function resetFilter() {
window.location.href = "{{ url_for('activity_log') }}"; window.location.href = "{{ url_for('log.activity_log') }}";
} }
</script> </script>
</body> </body>
</html> </html>