6 Commits

16 changed files with 891 additions and 896 deletions

3
.gitignore vendored
View File

@@ -3,5 +3,4 @@ venv/
__pycache__/ __pycache__/
static/downloads/ static/downloads/
static/uploads/ static/uploads/

View File

@@ -2,35 +2,27 @@ from flask import Blueprint, render_template, request, redirect, url_for, jsonif
from flask_login import login_required from flask_login import login_required
import config import config
from model.State import State
from model.Block import Block from model.Block import Block
from model.Utilities import HtmlHelper from model.Utilities import HtmlHelper
block_bp = Blueprint('block', __name__) block_bp = Blueprint('block', __name__)
# # --- Add Block page -------
@block_bp.route('/add_block', methods=['GET', 'POST']) @block_bp.route('/add_block', methods=['GET', 'POST'])
@login_required @login_required
def add_block(): def add_block():
block = Block() block = Block()
if request.method == 'POST': if request.method == 'POST':
block.AddBlock(request) block.AddBlock(request)
return block.resultMessage return block.resultMessage
connection = config.get_db_connection() state = State()
cursor = connection.cursor() states = state.GetAllStates(request=request)
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
block_data = block.GetAllBlocks(request) block_data = block.GetAllBlocks(request)
cursor.close()
connection.close()
return render_template( return render_template(
'add_block.html', 'add_block.html',
states=states, states=states,

View File

@@ -27,8 +27,6 @@ def upload():
f"User {current_user.id} Upload Excel File '{file.filename}'" f"User {current_user.id} Upload Excel File '{file.filename}'"
) )
return redirect(url_for('excel.show_table', filename=file.filename)) return redirect(url_for('excel.show_table', filename=file.filename))
else:
return redirect(url_for('upload_excel_file'))
return render_template('uploadExcelFile.html') return render_template('uploadExcelFile.html')

View File

@@ -1,70 +1,46 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required
from model.gst_release import GSTReleasemodel from model.gst_release import GSTRelease
from model.Log import LogHelper from model.Log import LogHelper
from flask import flash, current_app
gst_release_bp = Blueprint('gst_release_bp', __name__) gst_release_bp = Blueprint('gst_release_bp', __name__)
gst_service = GSTRelease()
# ------------------- Add GST Release ------------------- # ---------------- ADD GST RELEASE ----------------
@gst_release_bp.route('/add_gst_release', methods=['GET', 'POST']) @gst_release_bp.route('/add_gst_release', methods=['GET', 'POST'])
@login_required @login_required
def add_gst_release(): def add_gst_release():
gst_releases_dict = GSTReleasemodel.fetch_all_gst_releases()
gst_releases = [
[
g['GST_Release_Id'], g['PMC_No'], g['invoice_no'],
g['Basic_Amount'], g['Final_Amount'], g['Total_Amount'], g['UTR']
] for g in gst_releases_dict
] if gst_releases_dict else []
if request.method == 'POST': if request.method == 'POST':
pmc_no = request.form['PMC_No'] gst_service.AddGSTRelease(request)
invoice_no = request.form['invoice_No'] LogHelper.log_action("Add GST Release", f"User added GST release")
basic_amount = request.form['basic_amount'] flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
contractor_id = request.form['subcontractor_id']
LogHelper.log_action("Add GST Release", f"User {current_user.id} added GST release '{pmc_no}'")
GSTReleasemodel.insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id)
return redirect(url_for('gst_release_bp.add_gst_release')) return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('add_gst_release.html', gst_releases=gst_releases, invoices=[]) gst_releases = gst_service.GetAllGSTReleases()
return render_template('add_gst_release.html', gst_releases=gst_releases)
# ---------------- EDIT GST RELEASE ----------------
# ------------------- Edit GST Release -------------------
@gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST']) @gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required @login_required
def edit_gst_release(gst_release_id): def edit_gst_release(gst_release_id):
gst_release_data = GSTReleasemodel.fetch_gst_release_by_id(gst_release_id) gst_data = gst_service.GetGSTReleaseByID(gst_release_id)
if not gst_release_data: if not gst_data:
return "GST Release not found", 404 return "GST Release not found", 404
if request.method == 'POST': if request.method == 'POST':
pmc_no = request.form['PMC_No'] gst_service.EditGSTRelease(request, gst_release_id)
invoice_no = request.form['invoice_No'] LogHelper.log_action("Edit GST Release", f"User edited GST release")
basic_amount = request.form['basic_amount'] flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit GST Release", f"User {current_user.id} edited GST release '{pmc_no}'")
GSTReleasemodel.update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr)
return redirect(url_for('gst_release_bp.add_gst_release')) return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=[]) return render_template('edit_gst_release.html', gst_release_data=gst_data)
# ---------------- DELETE GST RELEASE ----------------
# ------------------- Delete GST Release -------------------
@gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST']) @gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required @login_required
def delete_gst_release(gst_release_id): def delete_gst_release(gst_release_id):
success, utr = GSTReleasemodel.delete_gst_release(gst_release_id) gst_service.DeleteGSTRelease(gst_release_id) # remove request
if not success: LogHelper.log_action("Delete GST Release", f"User deleted GST release")
return jsonify({"message": "GST Release not found or failed to delete", "status": "error"}), 404 flash(gst_service.resultMessage, 'success' if gst_service.isSuccess else 'error')
return redirect(url_for('gst_release_bp.add_gst_release'))
LogHelper.log_action("Delete GST Release", f"User {current_user.id} deleted GST release '{gst_release_id}'")
return jsonify({"message": f"GST Release {gst_release_id} deleted successfully.", "status": "success"}), 200

View File

@@ -1,98 +1,101 @@
# controllers/invoice_controller.py # controllers/invoice_controller.py
from flask import Blueprint, request, jsonify, render_template from flask import Blueprint, request, jsonify, render_template
from flask_login import login_required, current_user from flask_login import login_required, current_user
from model.Invoice import * from model.Invoice import *
from model.Log import LogHelper from model.Log import LogHelper
invoice_bp = Blueprint('invoice', __name__) invoice_bp = Blueprint('invoice', __name__)
# -------------------------------- Add Invoice --------------------------------- # ------------------------------- Helpers -------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST']) def handle_exception(func):
@login_required """Decorator to handle exceptions and return JSON error responses."""
def add_invoice(): def wrapper(*args, **kwargs):
if request.method == 'POST':
try: try:
village_name = request.form.get('village') return func(*args, **kwargs)
village_result = get_village_id(village_name)
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
data = request.form
invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id)
LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
except Exception as e: except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500 return jsonify({"status": "error", "message": str(e)}), 500
wrapper.__name__ = func.__name__
return wrapper
def log_action(action: str, detail: str):
LogHelper.log_action(action, f"User {current_user.id} {detail}")
# ------------------------------- Add Invoice -------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST'])
@login_required
@handle_exception
def add_invoice():
if request.method == 'POST':
data = request.form
village_name = data.get('village')
village_result = get_village_id(village_name)
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id)
log_action("Add invoice", f"added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
invoices = get_all_invoice_details() invoices = get_all_invoice_details()
villages = get_all_villages() villages = get_all_villages()
return render_template('add_invoice.html', invoices=invoices, villages=villages) return render_template('add_invoice.html', invoices=invoices, villages=villages)
# ------------------- Search Subcontractor ------------------- # ------------------------------- Search Subcontractor -------------------------------
@invoice_bp.route('/search_subcontractor', methods=['POST']) @invoice_bp.route('/search_subcontractor', methods=['POST'])
@login_required @login_required
@handle_exception
def search_subcontractor(): def search_subcontractor():
sub_query = request.form.get("query") query = request.form.get("query", "").strip()
results = search_contractors(sub_query) results = search_contractors(query)
if not results: if not results:
return "<li>No subcontractor found</li>" return "<li>No subcontractor found</li>"
output = "".join( return "".join(f"<li data-id='{r['Contractor_Id']}'>{r['Contractor_Name']}</li>" for r in results)
f"<li data-id='{row['Contractor_Id']}'>{row['Contractor_Name']}</li>"
for row in results
)
return output
# ------------------- Get Hold Types ------------------- # ------------------------------- Get Hold Types -------------------------------
@invoice_bp.route('/get_hold_types', methods=['GET']) @invoice_bp.route('/get_hold_types', methods=['GET'])
@login_required @login_required
@handle_exception
def get_hold_types(): def get_hold_types():
hold_types = get_all_hold_types() hold_types = get_all_hold_types()
LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type '{hold_types}'") log_action("Get hold type", f"retrieved hold types '{hold_types}'")
return jsonify(hold_types) return jsonify(hold_types)
# ------------------- Edit Invoice ------------------- # ------------------------------- Edit Invoice -------------------------------
@invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST']) @invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST'])
@login_required @login_required
@handle_exception
def edit_invoice(invoice_id): def edit_invoice(invoice_id):
if request.method == 'POST': if request.method == 'POST':
data = request.form data = request.form
update_invoice(data, invoice_id) update_invoice(data, invoice_id)
update_inpayment(data) update_inpayment(data)
log_action("Edit invoice", f"edited invoice '{invoice_id}'")
LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice '{invoice_id}'")
return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200 return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200
invoice = get_invoice_by_id(invoice_id) invoice = get_invoice_by_id(invoice_id)
return render_template('edit_invoice.html', invoice=invoice) return render_template('edit_invoice.html', invoice=invoice)
# ------------------- Delete Invoice ------------------- # ------------------------------- Delete Invoice -------------------------------
@invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET']) @invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET'])
@login_required @login_required
@handle_exception
def delete_invoice_route(invoice_id): def delete_invoice_route(invoice_id):
try: delete_invoice_data(invoice_id, current_user.id)
delete_invoice_data(invoice_id, current_user.id) log_action("Delete invoice", f"deleted invoice '{invoice_id}'")
LogHelper.log_action("Delete Invoice", f"User {current_user.id} deleted Invoice '{invoice_id}'") return jsonify({"status": "success", "message": f"Invoice {invoice_id} deleted successfully."})
return jsonify({
"message": f"Invoice {invoice_id} deleted successfully.",
"status": "success"
})
except Exception as e:
return jsonify({
"message": str(e),
"status": "error"
}), 500

View File

@@ -1,4 +1,4 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for, jsonify, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
from model.payment import Paymentmodel from model.payment import Paymentmodel
from model.Log import LogHelper from model.Log import LogHelper
@@ -88,16 +88,14 @@ def edit_payment(payment_id):
# ------------------- Delete Payment ------------------- # ------------------- Delete Payment -------------------
@payment_bp.route('/delete_payment/<int:payment_id>', methods=['GET', 'POST']) @payment_bp.route('/delete_payment/<int:payment_id>', methods=['POST'])
@login_required @login_required
def delete_payment(payment_id): def delete_payment(payment_id):
success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id) success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id)
if not success: if not success:
return jsonify({"message": "Payment not found or failed to delete", "status": "error"}), 404 flash("Payment not found or failed to delete", "error")
else:
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'")
flash(f"Payment ID {payment_id} deleted successfully.", "success")
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'") return redirect(url_for('payment_bp.add_payment'))
return jsonify({
"message": f"Payment ID {payment_id} deleted successfully.",
"status": "success"
}), 200

View File

@@ -1,34 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required from flask_login import login_required
from model.Subcontractor import Subcontractor from model.Subcontractor import Subcontractor
from model.Utilities import HtmlHelper, ResponseHandler
subcontractor_bp = Blueprint('subcontractor', __name__) subcontractor_bp = Blueprint('subcontractor', __name__)
# ----------------------------------------------------------
# Helpers (unchanged)
# ----------------------------------------------------------
class HtmlHelper:
@staticmethod
def json_response(data, status=200):
return jsonify(data), status
class ResponseHandler:
@staticmethod
def fetch_failure(entity):
return {"status": "error", "message": f"Failed to fetch {entity}"}
@staticmethod
def add_failure(entity):
return {"status": "error", "message": f"Failed to add {entity}"}
@staticmethod
def update_failure(entity):
return {"status": "error", "message": f"Failed to update {entity}"}
@staticmethod
def delete_failure(entity):
return {"status": "error", "message": f"Failed to delete {entity}"}
# ---------------------------------------------------------- # ----------------------------------------------------------
# LIST + ADD # LIST + ADD

View File

@@ -1,8 +1,11 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required from flask_login import login_required
import config import config
from model.Village import Village from model.Village import Village
from model.State import State from model.State import State
@@ -23,7 +26,6 @@ def add_village():
state = State() state = State()
states = state.GetAllStates(request=request) states = state.GetAllStates(request=request)
villages = village.GetAllVillages(request=request) villages = village.GetAllVillages(request=request)
return render_template( return render_template(
@@ -42,7 +44,6 @@ def get_districts(state_id):
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc("GetDistrictByStateID", [state_id]) cursor.callproc("GetDistrictByStateID", [state_id])
districts = [] districts = []
for rs in cursor.stored_results(): for rs in cursor.stored_results():
@@ -51,15 +52,7 @@ def get_districts(state_id):
cursor.close() cursor.close()
connection.close() connection.close()
district_list = [] return jsonify([{"id": d[0], "name": d[1]} for d in districts])
for d in districts:
district_list.append({
"id": d[0],
"name": d[1]
})
return jsonify(district_list)
# ------------------------- Fetch Blocks ------------------------- # ------------------------- Fetch Blocks -------------------------
@@ -71,7 +64,6 @@ def get_blocks(district_id):
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc("GetBlocksByDistrictID", [district_id]) cursor.callproc("GetBlocksByDistrictID", [district_id])
blocks = [] blocks = []
for rs in cursor.stored_results(): for rs in cursor.stored_results():
@@ -80,22 +72,13 @@ def get_blocks(district_id):
cursor.close() cursor.close()
connection.close() connection.close()
block_list = [] return jsonify([{"id": b[0], "name": b[1]} for b in blocks])
for b in blocks:
block_list.append({
"id": b[0],
"name": b[1]
})
return jsonify(block_list)
# ------------------------- Check Village ------------------------- # ------------------------- Check Village -------------------------
@village_bp.route('/check_village', methods=['POST']) @village_bp.route('/check_village', methods=['POST'])
@login_required @login_required
def check_village(): def check_village():
village = Village() village = Village()
return village.CheckVillage(request=request) return village.CheckVillage(request=request)
@@ -106,14 +89,9 @@ def check_village():
def delete_village(village_id): def delete_village(village_id):
village = Village() village = Village()
village.DeleteVillage(request=request, village_id=village_id) village.DeleteVillage(request=request, village_id=village_id)
if not village.isSuccess: flash(village.resultMessage, "success" if village.isSuccess else "error")
flash(village.resultMessage, "error")
else:
flash(village.resultMessage, "success")
return redirect(url_for('village.add_village')) return redirect(url_for('village.add_village'))
@@ -135,8 +113,8 @@ def edit_village(village_id):
else: else:
flash(village.resultMessage, "error") flash(village.resultMessage, "error")
village_data = village.GetVillageByID(request=request, id=village_id) village_data = village.GetVillageByID(id=village_id) or []
blocks = village.GetAllBlocks(request=request) blocks = village.GetAllBlocks() or []
return render_template( return render_template(
'edit_village.html', 'edit_village.html',
@@ -145,23 +123,17 @@ def edit_village(village_id):
) )
else: else:
# ✅ FIXED HERE (removed request)
village_data = village.GetVillageByID(request=request, id=village_id) village_data = village.GetVillageByID(id=village_id)
if not village.isSuccess: if not village.isSuccess:
flash(village.resultMessage, "error") flash(village.resultMessage, "error")
return redirect(url_for('village.add_village')) return redirect(url_for('village.add_village'))
blocks = village.GetAllBlocks(request=request) blocks = village.GetAllBlocks() or []
if village_data is None:
village_data = []
if blocks is None:
blocks = []
return render_template( return render_template(
'edit_village.html', 'edit_village.html',
village_data=village_data, village_data=village_data or [],
blocks=blocks blocks=blocks
) )

View File

@@ -1,72 +1,65 @@
import mysql.connector
from mysql.connector import Error from mysql.connector import Error
import config import config
import openpyxl
import os
import re
import ast
from datetime import datetime from datetime import datetime
class ContractorInfo: class ContractorInfo:
ID = "" def __init__(self, contractor_id):
contInfo = None self.ID = contractor_id
def __init__(self, id): self.contInfo = None
self.ID = id
print(id)
self.fetchData() self.fetchData()
def fetchData(self): def fetchData(self):
"""Fetch basic contractor info by ID."""
try: try:
connection = config.get_db_connection() connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True) with connection.cursor(dictionary=True, buffered=True) as cursor:
cursor.callproc('GetContractorInfoById', [self.ID]) cursor.callproc('GetContractorInfoById', [self.ID])
for result in cursor.stored_results(): # Get the first result set
self.contInfo = result.fetchone() for result in cursor.stored_results():
self.contInfo = result.fetchone()
print(self.contInfo,flush=True) except Error as e:
print(f"Error fetching contractor info: {e}")
finally: finally:
cursor.close() if connection.is_connected():
connection.close() connection.close()
def fetchalldata(self): def fetchalldata(self):
"""Fetch hold types and invoices for contractor."""
data = {}
try: try:
connection = config.get_db_connection() connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True) with connection.cursor(dictionary=True, buffered=True) as cursor:
print("here", flush=True) # Fetch Hold Types
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
data['hold_types'] = hold_type_map
# ---------------- Hold Types ---------------- # Fetch Invoices
cursor = connection.cursor(dictionary=True) cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.callproc('GetHoldTypesByContractor', [self.ID]) # Remove duplicate invoices
seen_ids = set()
hold_types = [] unique_invoices = []
for result in cursor.stored_results(): for inv in invoices:
hold_types = result.fetchall() if inv['Invoice_Id'] not in seen_ids:
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} seen_ids.add(inv['Invoice_Id'])
unique_invoices.append(inv)
# ---------------- Invoices ---------------- data['invoices'] = unique_invoices
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
except Error as e:
print(f"Error fetching contractor data: {e}")
finally: finally:
cursor.close() if connection.is_connected():
connection.close() connection.close()
return data

View File

@@ -1,55 +1,51 @@
import config from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GST: class GST:
@staticmethod @staticmethod
def get_unreleased_gst(): def get_unreleased_gst():
# Use ItemCRUD for Invoices
invoice_crud = ItemCRUD(itemType=ItemCRUDType.Invoice)
invoices_rows = invoice_crud.GetAllData(storedproc="GetAllInvoicesBasic")
connection = config.get_db_connection() if not invoice_crud.isSuccess:
cursor = connection.cursor(dictionary=True) return [] # Could also log invoice_crud.resultMessage
try: invoices = [
# ----------- Invoices ----------- dict(
cursor.callproc('GetAllInvoicesBasic') Invoice_No=row[1],
invoices = [] GST_SD_Amount=float(row[2]) if row[2] is not None else 0
for result in cursor.stored_results(): )
invoices = result.fetchall() for row in invoices_rows
]
# Use ItemCRUD for GST Releases
gst_crud = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst_rows = gst_crud.GetAllData(storedproc="GetAllGSTReleasesBasic")
# ----------- GST Releases ----------- if not gst_crud.isSuccess:
cursor.callproc('GetAllGSTReleasesBasic') return [] # Could also log gst_crud.resultMessage
gst_releases = []
for result in cursor.stored_results():
gst_releases = result.fetchall()
gst_invoice_nos = { gst_invoice_nos = {
g['Invoice_No'] g[2] # Invoice_No is at index 2
for g in gst_releases for g in gst_rows
if g['Invoice_No'] if g[2]
} }
gst_basic_amounts = { gst_basic_amounts = {
float(g['Basic_Amount']) float(g[3]) # Basic_Amount at index 3
for g in gst_releases for g in gst_rows
if g['Basic_Amount'] is not None if g[3] is not None
} }
unreleased = [] # Filter unreleased invoices
unreleased = []
for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = inv['GST_SD_Amount'] in gst_basic_amounts
for inv in invoices: if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos return unreleased
match_by_gst_amount = float(
inv.get('GST_SD_Amount') or 0
) in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
return unreleased
finally:
cursor.close()
connection.close()

View File

@@ -1,35 +1,76 @@
import config import config
import mysql.connector import mysql.connector
# ------------------- Helper ------------------- # ------------------- Helper Functions -------------------
def clear_results(cursor): def clear_results(cursor):
"""Consume all stored results to prevent cursor issues."""
for r in cursor.stored_results(): for r in cursor.stored_results():
r.fetchall() r.fetchall()
def fetch_one(cursor):
"""Fetch first row from stored results."""
for result in cursor.stored_results():
return result.fetchone()
return None
# ------------------- Get Village Id ------------------- def fetch_all(cursor):
def get_village_id(village_name): """Fetch all rows from stored results."""
data = []
for result in cursor.stored_results():
data.extend(result.fetchall())
return data
def get_numeric_values(data):
"""Return numeric fields for invoices safely."""
return [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def execute_db_operation(operation_func):
"""General DB operation wrapper with commit/rollback."""
connection = config.get_db_connection() connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True) cursor = connection.cursor(dictionary=True)
cursor.callproc("GetVillageIdByName", (village_name,))
village_result = None
for rs in cursor.stored_results():
village_result = rs.fetchone()
cursor.close()
connection.close()
return village_result
# ------------------- Insert Invoice -------------------
def insert_invoice(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try: try:
# 1. Insert Invoice result = operation_func(cursor)
connection.commit()
return result
except Exception:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
# ------------------- Village Functions -------------------
def get_village_id(village_name):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (village_name,))
return fetch_one(cursor)
return execute_db_operation(operation)
def get_all_villages():
def operation(cursor):
cursor.callproc("GetAllVillages")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Invoice Functions -------------------
def insert_invoice(data, village_id):
def operation(cursor):
# Insert invoice
cursor.callproc('InsertInvoice', [ cursor.callproc('InsertInvoice', [
data.get('pmc_no'), data.get('pmc_no'),
village_id, village_id,
@@ -37,29 +78,14 @@ def insert_invoice(data, village_id):
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
data.get('invoice_no'), data.get('invoice_no'),
float(data.get('basic_amount') or 0), *get_numeric_values(data)
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0)
]) ])
invoice_row = fetch_one(cursor)
invoice_id = None if not invoice_row:
for result in cursor.stored_results():
row = result.fetchone()
if row:
invoice_id = row.get('invoice_id')
if not invoice_id:
raise Exception("Invoice ID not returned") raise Exception("Invoice ID not returned")
invoice_id = invoice_row.get('invoice_id')
# 2. Insert Inpayment # Insert inpayment
cursor.callproc('InsertInpayment', [ cursor.callproc('InsertInpayment', [
data.get('pmc_no'), data.get('pmc_no'),
village_id, village_id,
@@ -67,221 +93,41 @@ def insert_invoice(data, village_id):
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
data.get('invoice_no'), data.get('invoice_no'),
float(data.get('basic_amount') or 0), *get_numeric_values(data),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
data.get('subcontractor_id') data.get('subcontractor_id')
]) ])
clear_results(cursor) clear_results(cursor)
connection.commit()
return invoice_id return invoice_id
except Exception as e: return execute_db_operation(operation)
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Assign Subcontractor -------------------
def assign_subcontractor(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Insert Hold Types -------------------
def insert_hold_types(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = None
for result in cursor.stored_results():
hold_type_result = result.fetchone()
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
hold_amount = float(hold_amount or 0)
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
hold_amount
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Get All Invoices -------------------
def get_all_invoice_details(): def get_all_invoice_details():
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True) cursor.callproc('GetAllInvoiceDetails')
return fetch_all(cursor)
return execute_db_operation(operation)
cursor.callproc('GetAllInvoiceDetails')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.close()
connection.close()
return invoices
# ------------------- Get All Villages -------------------
def get_all_villages():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllVillages")
villages = []
for result in cursor.stored_results():
villages = result.fetchall()
cursor.close()
connection.close()
return villages
# ------------------- Search Contractors -------------------
def search_contractors(sub_query):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('SearchContractorsByName', [sub_query])
results = []
for result in cursor.stored_results():
results = result.fetchall()
cursor.close()
connection.close()
return results
# ------------------- Get All Hold Types -------------------
def get_all_hold_types():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllHoldTypes")
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
cursor.close()
connection.close()
return hold_types
# ------------------- Get Invoice By Id -------------------
def get_invoice_by_id(invoice_id): def get_invoice_by_id(invoice_id):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True) cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = fetch_one(cursor)
cursor.callproc('GetInvoiceDetailsById', [invoice_id]) cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
invoice = None hold_amounts = fetch_all(cursor)
for result in cursor.stored_results(): if invoice:
invoice = result.fetchone() invoice["hold_amounts"] = hold_amounts
return invoice
return execute_db_operation(operation)
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = []
for result in cursor.stored_results():
hold_amounts = result.fetchall()
if invoice:
invoice["hold_amounts"] = hold_amounts
cursor.close()
connection.close()
return invoice
# ------------------- Update Invoice -------------------
def update_invoice(data, invoice_id): def update_invoice(data, invoice_id):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc("GetVillageIdByName", (data.get('village'),)) cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = None village = fetch_one(cursor)
if not village:
for rs in cursor.stored_results(): raise Exception("Village not found")
village = rs.fetchone()
village_id = village['Village_Id'] village_id = village['Village_Id']
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInvoice', [ cursor.callproc('UpdateInvoice', [
data.get('pmc_no'), data.get('pmc_no'),
village_id, village_id,
@@ -289,91 +135,101 @@ def update_invoice(data, invoice_id):
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
data.get('invoice_no'), data.get('invoice_no'),
*numeric, *get_numeric_values(data),
invoice_id invoice_id
]) ])
clear_results(cursor) clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Update Inpayment -------------------
def update_inpayment(data): def update_inpayment(data):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True)
try:
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInpayment', [ cursor.callproc('UpdateInpayment', [
data.get('work_type'), data.get('work_type'),
data.get('invoice_details'), data.get('invoice_details'),
data.get('invoice_date'), data.get('invoice_date'),
*numeric, *get_numeric_values(data),
data.get('pmc_no'), data.get('pmc_no'),
data.get('invoice_no') data.get('invoice_no')
]) ])
clear_results(cursor) clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Delete Invoice -------------------
def delete_invoice_data(invoice_id, user_id): def delete_invoice_data(invoice_id, user_id):
connection = config.get_db_connection() def operation(cursor):
cursor = connection.cursor(dictionary=True) # Fetch PMC and Invoice_No from DB
cursor.callproc('GetInvoicePMCById', (invoice_id,))
try:
cursor.callproc('GetInvoicePMCById', [invoice_id])
record = {} record = {}
for result in cursor.stored_results(): for result in cursor.stored_results():
record = result.fetchone() or {} record = result.fetchone() or {}
if not record: if not record:
raise Exception("Invoice not found") raise Exception("Invoice not found")
# Use exact DB keys
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete invoice
cursor.callproc("DeleteInvoice", (invoice_id,)) cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor) clear_results(cursor)
cursor.callproc( # Delete inpayment
'DeleteInpaymentByPMCInvoice', cursor.callproc('DeleteInpaymentByPMCInvoice', (pmc_no, invoice_no))
[record['PMC_No'], record['invoice_no']] clear_results(cursor)
)
connection.commit() execute_db_operation(operation)
except Exception as e:
connection.rollback()
raise e
finally: # ------------------- Subcontractor Functions -------------------
cursor.close() def assign_subcontractor(data, village_id):
connection.close() def operation(cursor):
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
execute_db_operation(operation)
# ------------------- Hold Types Functions -------------------
def insert_hold_types(data, invoice_id):
def operation(cursor):
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = fetch_one(cursor)
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
float(hold_amount or 0)
])
clear_results(cursor)
execute_db_operation(operation)
def get_all_hold_types():
def operation(cursor):
cursor.callproc("GetAllHoldTypes")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Contractor Functions -------------------
def search_contractors(sub_query):
def operation(cursor):
cursor.callproc('SearchContractorsByName', [sub_query])
return fetch_all(cursor)
return execute_db_operation(operation)

View File

@@ -1,39 +1,29 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os import os
from datetime import datetime
from flask import current_app
from flask_login import current_user
class LogHelper: class LogHelper:
@staticmethod @staticmethod
def log_action(action, details=""): def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details.""" """Add a log entry."""
logData = LogData() log_data = LogData()
logData.WriteLog(action, details="") log_data.add_log(action, details)
class LogData: class LogData:
filepath = ""
timestamp = None
def __init__(self): def __init__(self):
self.filepath = os.path.join(current_app.root_path, 'activity.log') self.filepath = os.path.join(current_app.root_path, 'activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.user = getattr(current_user, "cn", None) \
or getattr(current_user, "username", None) \
or getattr(current_user, "sAMAccountName", "Unknown")
if hasattr(current_user, "cn") and current_user.cn:
self.user = current_user.cn
elif hasattr(current_user, "username") and current_user.username:
self.user = current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
self.user = current_user.sAMAccountName
else:
self.user = "Unknown"
def WriteLog(self, action, details=""): def add_log(self, action, details=""):
"""Log user actions with timestamp, user, action, and details.""" """Create/Add a log entry."""
with open(self.filepath, "a", encoding="utf-8") as f: with open(self.filepath, "a", encoding="utf-8") as f:
f.write( f.write(
f"Timestamp: {self.timestamp} | " f"Timestamp: {self.timestamp} | "
@@ -42,46 +32,73 @@ class LogData:
f"Details: {details}\n" f"Details: {details}\n"
) )
def get_all_logs(self):
def GetActivitiesLog(self): """Read all logs."""
logs = [] logs = []
if os.path.exists(self.filepath): if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f: with open(self.filepath, 'r', encoding="utf-8") as f:
for line in f: for line in f:
parts = line.strip().split(" | ") parts = line.strip().split(" | ")
if len(parts) == 4: if len(parts) == 4:
logs.append({ logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(), "timestamp": parts[0].split(":", 1)[1].strip(),
"user": parts[1].replace("User:", "").strip(), "user": parts[1].split(":", 1)[1].strip(),
"action": parts[2].replace("Action:", "").strip(), "action": parts[2].split(":", 1)[1].strip(),
"details": parts[3].replace("Details:", "").strip() "details": parts[3].split(":", 1)[1].strip()
}) })
return logs return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName): def get_filtered_logs(self, start_date=None, end_date=None, user_name=None):
"""Filter logs by date and/or user."""
logs = self.get_all_logs()
filtered_logs = self.GetActivitiesLog() # Filter by date
if start_date or end_date:
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max
logs = [
log for log in logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
# Date filter # Filter by username
if startDate or endDate: if user_name:
try: logs = [log for log in logs if user_name.lower() in log.get("user", "").lower()]
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max return logs
def update_log(self, index, action=None, details=None):
"""Update a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
if action:
logs[index]["action"] = action
if details:
logs[index]["details"] = details
self._rewrite_logs_file(logs)
return True
return False
def delete_log(self, index):
"""Delete a specific log entry by index (0-based)."""
logs = self.get_all_logs()
if 0 <= index < len(logs):
logs.pop(index)
self._rewrite_logs_file(logs)
return True
return False
# ------------------- INTERNAL HELPER -------------------
def _rewrite_logs_file(self, logs):
"""Overwrite the log file with current logs."""
with open(self.filepath, "w", encoding="utf-8") as f:
for log in logs:
f.write(
f"Timestamp: {log['timestamp']} | "
f"User: {log['user']} | "
f"Action: {log['action']} | "
f"Details: {log['details']}\n"
)
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception as e:
print("Date filter error:", e)
#Why catching all exceptions? Need to handle specific exceptions
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

View File

@@ -1,14 +1,7 @@
# return blocks
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import config import config
import mysql.connector import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD from model.ItemCRUD import ItemCRUD
@@ -19,103 +12,162 @@ class Village:
def __init__(self): def __init__(self):
self.isSuccess = False self.isSuccess = False
self.resultMessage = "" self.resultMessage = ""
self.village = ItemCRUD(itemType=ItemCRUDType.Village)
# 🔹 Helper: sync status
def _set_status(self, village):
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
# 🔹 Helper: get request data
def _get_form_data(self, request):
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
return block_id, village_name
def AddVillage(self, request): def AddVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village) block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id') if not village_name:
village_name = request.form.get('Village_Name', '').strip() self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
return
village.AddItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" ) try:
self.isSuccess = village.isSuccess self.village.AddItem(
self.resultMessage = village.resultMessage request=request,
return parentid=block_id,
#self.isSuccess = False childname=village_name,
storedprocfetch="GetVillageByNameAndBlock",
storedprocadd="SaveVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllVillages(self, request): def GetAllVillages(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagesdata = village.GetAllData(request=request, storedproc="GetAllVillages")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagesdata
try:
villagesdata = self.village.GetAllData(
request=request,
storedproc="GetAllVillages"
)
self._set_status(self.village)
return villagesdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return []
def CheckVillage(self, request): def CheckVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village) block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip() if not village_name:
result = village.CheckItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlocks") self.isSuccess = False
self.isSuccess = village.isSuccess self.resultMessage = "Village name cannot be empty"
self.resultMessage = village.resultMessage return None
return result
try:
result = self.village.CheckItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlocks"
)
self._set_status(self.village)
return result
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def DeleteVillage(self, request, village_id): def DeleteVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village) try:
self.village.DeleteItem(
request=request,
itemID=village_id,
storedprocDelete="DeleteVillage"
)
self._set_status(self.village)
village.DeleteItem(request=request, itemID=village_id, storedprocDelete="DeleteVillage" ) except Exception as e:
self.isSuccess = village.isSuccess self.isSuccess = False
self.resultMessage = village.resultMessage self.resultMessage = str(e)
return
def EditVillage(self, request, village_id): def EditVillage(self, request, village_id):
corsor=None block_id, village_name = self._get_form_data(request)
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id') if not village_name:
village_name = request.form.get('Village_Name', '').strip()
village.EditItem(request=request,childid=village_id,parentid=block_id,childname=village_name,storedprocupdate="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
# def GetVillageByID(self, request, id):
# village = ItemCRUD(itemType=ItemCRUDType.Village)
# villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
# self.isSuccess = village.isSuccess
# self.resultMessage = village.resultMessage
# return villagedetailsdata
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetDataByID(id=id,storedproc="GetVillageDetailsById")
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False self.isSuccess = False
self.resultMessage = "Village not found" self.resultMessage = "Village name cannot be empty"
return
return villagedetailsdata try:
self.village.EditItem(
request=request,
childid=village_id,
parentid=block_id,
childname=village_name,
storedprocupdate="UpdateVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllBlocks(self, request): def GetVillageByID(self, id):
try:
villagedetailsdata = self.village.GetDataByID(
id=id,
storedproc="GetVillageDetailsById"
)
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Village not found"
return villagedetailsdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def GetAllBlocks(self):
blocks = [] blocks = []
self.isSuccess = False self.isSuccess = False
self.resultMessage = "" self.resultMessage = ""
connection = config.get_db_connection()
connection = config.get_db_connection()
if not connection: if not connection:
return [] return []
cursor = connection.cursor()
try: try:
cursor.callproc('GetAllBlocks') with connection.cursor() as cursor:
for result in cursor.stored_results(): cursor.callproc('GetAllBlocks')
blocks = result.fetchall()
for result in cursor.stored_results():
blocks.extend(result.fetchall())
self.isSuccess = True self.isSuccess = True
return blocks
except mysql.connector.Error as e: except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}") print(f"Error fetching blocks: {e}")
self.isSuccess = False self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500) self.resultMessage = HtmlHelper.json_response(
finally: ResponseHandler.fetch_failure("block"), 500
cursor.close() )
connection.close() return []
return blocks finally:
connection.close()

View File

@@ -1,150 +1,246 @@
import config # from flask import request
import mysql.connector # from model.ItemCRUD import ItemCRUD
# from model.Utilities import ItemCRUDType
class GSTReleasemodel: # class GSTRelease:
# """CRUD operations for GST Release using ItemCRUD"""
# def __init__(self):
# self.isSuccess = False
# self.resultMessage = ""
# # ------------------- Add GST Release -------------------
# def AddGSTRelease(self, request):
# pmc_no = request.form.get('PMC_No', '').strip()
# invoice_no = request.form.get('invoice_No', '').strip()
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.AddItem(
# request=request,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocfetch="CheckGSTReleaseExists",
# storedprocadd="AddGSTReleaseFromExcel" # your stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Get All GST Releases -------------------
# def GetAllGSTReleases(self):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# # Pass request=None for fetch
# rows = gst.GetAllData(request=None, storedproc="GetAllGSTReleases")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# data = []
# for row in rows:
# data.append({
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# })
# return data
# # ------------------- Get GST Release By ID -------------------
# def GetGSTReleaseByID(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# row = gst.GetDataByID(gst_release_id, request=None, storedproc="GetGSTReleaseById")
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# if row:
# return {
# "gst_release_id": row[0],
# "pmc_no": row[1],
# "invoice_no": row[2],
# "basic_amount": row[3],
# "final_amount": row[4],
# "total_amount": row[5],
# "utr": row[6],
# "contractor_id": row[7]
# }
# return None
# # ------------------- Edit GST Release -------------------
# def EditGSTRelease(self, request, gst_release_id):
# pmc_no = request.form.get('PMC_No', '').strip()
# invoice_no = request.form.get('invoice_No', '').strip()
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.EditItem(
# request=request,
# childid=gst_release_id,
# parentid=None,
# childname=f"{pmc_no}-{invoice_no}",
# storedprocupdate="UpdateGSTRelease" # stored procedure handles extra fields
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
# # ------------------- Delete GST Release -------------------
# def DeleteGSTRelease(self, gst_release_id):
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# gst.DeleteItem(
# itemID=gst_release_id,
# request=None,
# storedprocDelete="DeleteGSTReleaseById"
# )
# self.isSuccess = gst.isSuccess
# self.resultMessage = str(gst.resultMessage)
from flask import request, jsonify
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GSTRelease:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add GST Release -------------------
def AddGSTRelease(self, request):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip(),
"Contractor_ID": int(request.form.get("Contractor_ID", 0) or 0)
}
gst.AddItem(
request=request,
data=data,
storedprocfetch="CheckGSTReleaseExists",
storedprocadd="AddGSTReleaseFromExcel"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in AddGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Edit GST Release -------------------
def EditGSTRelease(self, request, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip()
}
gst.EditItem(
request=request,
childid=gst_release_id,
data=data,
storedprocupdate="UpdateGSTRelease"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in EditGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Delete GST Release -------------------
def DeleteGSTRelease(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst.DeleteItem(
request=None,
itemID=gst_release_id,
storedprocDelete="DeleteGSTReleaseById"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in DeleteGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Get All GST Releases -------------------
def GetAllGSTReleases(self):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
rows = gst.GetAllData(None, "GetAllGSTReleases")
data = []
for row in rows:
data.append({
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
})
return data
except Exception as e:
print("ERROR in GetAllGSTReleases:", e)
return []
# ------------------- Get GST Release By ID -------------------
def GetGSTReleaseByID(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
row = gst.GetDataByID(gst_release_id, "GetGSTReleaseById")
if row:
return {
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
}
@staticmethod
def get_connection():
connection = config.get_db_connection()
if not connection:
return None return None
return connection
@staticmethod except Exception as e:
def fetch_all_gst_releases(): print("ERROR in GetGSTReleaseByID:", e)
connection = GSTReleasemodel.get_connection() return None
gst_releases = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetAllGSTReleases')
gst_releases = []
for result in cursor.stored_results(): # change to procedure
gst_releases = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching GST releases: {e}")
finally:
cursor.close()
connection.close()
return gst_releases
@staticmethod
def insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Insert into gst_release
cursor.callproc(
'InsertGSTReleaseOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
)
# Insert into inpayment
cursor.callproc(
'InsertInpaymentOnly',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
)
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def fetch_gst_release_by_id(gst_release_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return None
data = {}
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetGSTReleaseById', [gst_release_id])
for result in cursor.stored_results():
data = result.fetchone()
if data:
# Convert to array for template
data = [
data.get('GST_Release_Id'),
data.get('PMC_No'),
data.get('Invoice_No'),
data.get('Basic_Amount'),
data.get('Final_Amount'),
data.get('Total_Amount'),
data.get('UTR')
]
except mysql.connector.Error as e:
print(f"Error fetching GST release by id: {e}")
finally:
cursor.close()
connection.close()
return data
@staticmethod
def update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr):
connection = GSTReleasemodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Update gst_release
cursor.callproc(
'UpdateGSTRelease',
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id]
)
# Update inpayment
cursor.callproc(
'UpdateInpaymentByUTR',
[basic_amount, final_amount, total_amount, utr]
)
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating GST release: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_gst_release(gst_release_id):
connection = GSTReleasemodel.get_connection()
if not connection:
return False, None
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetGSTReleaseUTRById', [gst_release_id])
record = None
for result in cursor.stored_results():
record = result.fetchone()
if not record:
return False, None
utr = record['UTR']
# Step 1: Delete gst_release
cursor.callproc('DeleteGSTReleaseById', [gst_release_id])
# Step 2: Reset inpayment using UTR
cursor.callproc('ResetInpaymentByUTR', [utr])
connection.commit()
return True, utr
except mysql.connector.Error as e:
print(f"Error deleting GST release: {e}")
return False, None
finally:
cursor.close()
connection.close()

View File

@@ -1,8 +1,13 @@
import config import config
import mysql.connector import mysql.connector
import config
import mysql.connector
from enum import Enum
from model.Utilities import ItemCRUDType
class Paymentmodel: class Paymentmodel:
# ---------------- Database Connection ----------------
@staticmethod @staticmethod
def get_connection(): def get_connection():
connection = config.get_db_connection() connection = config.get_db_connection()
@@ -10,6 +15,7 @@ class Paymentmodel:
return None return None
return connection return connection
# ---------------- Payment Methods ----------------
@staticmethod @staticmethod
def fetch_all_payments(): def fetch_all_payments():
connection = Paymentmodel.get_connection() connection = Paymentmodel.get_connection()
@@ -52,14 +58,8 @@ class Paymentmodel:
try: try:
cursor = connection.cursor() cursor = connection.cursor()
cursor.callproc('UpdateInpaymentRecord', [ cursor.callproc('UpdateInpaymentRecord', [
subcontractor_id, subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr
pmc_no, ])
invoice_no,
amount,
tds_amount,
total_amount,
utr
])
connection.commit() connection.commit()
return True return True
except mysql.connector.Error as e: except mysql.connector.Error as e:
@@ -80,7 +80,6 @@ class Paymentmodel:
cursor.callproc("GetPaymentById", (payment_id,)) cursor.callproc("GetPaymentById", (payment_id,))
for result in cursor.stored_results(): for result in cursor.stored_results():
payment_data = result.fetchone() payment_data = result.fetchone()
# Convert to array for template
if payment_data: if payment_data:
payment_data = [ payment_data = [
payment_data.get('Payment_Id'), payment_data.get('Payment_Id'),
@@ -117,42 +116,99 @@ class Paymentmodel:
@staticmethod @staticmethod
def delete_payment(payment_id): def delete_payment(payment_id):
"""
Deletes a payment and resets the related inpayment fields in one go.
Returns (success, pmc_no, invoice_no)
"""
connection = Paymentmodel.get_connection() connection = Paymentmodel.get_connection()
if not connection: if not connection:
return False, None, None return False, None, None
try: try:
cursor = connection.cursor(dictionary=True) cursor = connection.cursor(dictionary=True)
# Fetch PMC & Invoice before deleting
cursor.callproc('GetPaymentPMCInvoiceById', [payment_id]) cursor.callproc('GetPaymentPMCInvoiceById', [payment_id])
record = {} record = {}
for result in cursor.stored_results(): for result in cursor.stored_results():
record = result.fetchone() or {} record = result.fetchone() or {}
if not record: if not record:
return False, None, None return False, None, None
pmc_no = record['PMC_No'] pmc_no = record['PMC_No']
invoice_no = record['Invoice_No'] invoice_no = record['Invoice_No']
# Delete payment
# Step 2: Delete the payment using the stored procedure
cursor.callproc("DeletePayment", (payment_id,)) cursor.callproc("DeletePayment", (payment_id,))
connection.commit() connection.commit()
# Reset inpayment fields
# Step 3: Reset inpayment fields using the stored procedure
cursor.callproc("ResetInpayment", [pmc_no, invoice_no]) cursor.callproc("ResetInpayment", [pmc_no, invoice_no])
connection.commit() connection.commit()
return True, pmc_no, invoice_no return True, pmc_no, invoice_no
except mysql.connector.Error as e: except mysql.connector.Error as e:
print(f"Error deleting payment: {e}") print(f"Error deleting payment: {e}")
return False, None, None return False, None, None
finally:
cursor.close()
connection.close()
# ---------------- Item CRUD Methods ----------------
@staticmethod
def fetch_items(item_type: ItemCRUDType):
connection = Paymentmodel.get_connection()
items = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetItemsByType', [item_type.value])
for result in cursor.stored_results():
items = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching {item_type.name}: {e}")
finally:
cursor.close()
connection.close()
return items
@staticmethod
def insert_item(item_type: ItemCRUDType, name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('InsertItem', [item_type.value, name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def update_item(item_type: ItemCRUDType, item_id: int, new_name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('UpdateItem', [item_type.value, item_id, new_name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_item(item_type: ItemCRUDType, item_id: int):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('DeleteItem', [item_type.value, item_id])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error deleting {item_type.name}: {e}")
return False
finally: finally:
cursor.close() cursor.close()
connection.close() connection.close()

View File

@@ -1,5 +1,6 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<title>Activity Logs</title> <title>Activity Logs</title>
@@ -9,67 +10,79 @@
background-color: #f8f9fa; background-color: #f8f9fa;
margin: 20px; margin: 20px;
} }
h2 { h2 {
text-align: center; text-align: center;
margin-bottom: 20px; margin-bottom: 20px;
} }
form { form {
display: flex; display: flex;
gap: 10px; gap: 10px;
justify-content: center; justify-content: center;
margin-bottom: 20px; margin-bottom: 20px;
flex-wrap: wrap;
} }
input, button {
input,
button {
padding: 8px; padding: 8px;
border-radius: 6px; border-radius: 6px;
border: 1px solid #ccc; border: 1px solid #ccc;
} }
button { button {
background-color: #007bff; background-color: #007bff;
color: white; color: white;
cursor: pointer; cursor: pointer;
} }
button:hover { button:hover {
background-color: #0056b3; background-color: #0056b3;
} }
table { table {
width: 90%; width: 90%;
margin: auto; margin: auto;
border-collapse: collapse; border-collapse: collapse;
background: white; background: white;
box-shadow: 0 0 8px rgba(0,0,0,0.1); box-shadow: 0 0 8px rgba(0, 0, 0, 0.1);
} }
th, td {
th,
td {
padding: 12px; padding: 12px;
border: 1px solid #ddd; border: 1px solid #ddd;
text-align: center; text-align: center;
} }
th { th {
background-color: #007bff; background-color: #007bff;
color: white; color: white;
} }
tr:nth-child(even) { tr:nth-child(even) {
background-color: #f2f2f2; background-color: #f2f2f2;
} }
</style> </style>
</head> </head>
<body> <body>
<h2>Activity Logs</h2> <h2>Activity Logs</h2>
<form method="get" action="{{ url_for('activity_log') }}" class="filter-form">
<label for="username">Username:</label> <form method="get" action="{{ url_for('log.activity_log') }}" class="filter-form">
<input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}"> <label for="username">Username:</label>
<input type="text" id="username" name="username" placeholder="Enter username" value="{{ username or '' }}">
<label for="start_date">Start Date:</label> <label for="start_date">Start Date:</label>
<input type="date" id="start_date" name="start_date" value="{{ start_date or '' }}"> <input type="date" id="start_date" name="start_date" value="{{ start_date or '' }}">
<label for="end_date">End Date:</label> <label for="end_date">End Date:</label>
<input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}"> <input type="date" id="end_date" name="end_date" value="{{ end_date or '' }}">
<button type="submit" class="btn btn-primary">Filter</button>
<button type="submit" class="btn btn-primary">Filter</button> <button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button>
<!-- <button type="button" style="background-color: #6c757d;" onclick="resetFilter()">Reset</button> --> </form>
</form>
<table> <table>
<tr> <tr>
@@ -94,9 +107,10 @@
</table> </table>
<script> <script>
function resetFilter() { function resetFilter() {
window.location.href = "{{ url_for('activity_log') }}"; window.location.href = "{{ url_for('log.activity_log') }}";
} }
</script> </script>
</body> </body>
</html>
</html>