from decimal import Decimal from AppCode import ContractorInfo, Auth, Utilities, Log from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper from AppCode.Auth import LoginLDAP, User from AppCode.Log import LogData, LogHelper from AppCode.State import State from AppCode.District import District from AppCode.Block import Block from AppCode.Village import Village # need to optimize above import lines from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json from flask import current_app, session, send_file from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import mysql.connector from mysql.connector import Error import config import os import re import ast from datetime import datetime import openpyxl from openpyxl import load_workbook from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils import get_column_letter import logging import pandas as pd #import AppRoutes.StateRoute # this is server app = Flask(__name__) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' @login_manager.user_loader def load_user(user_id): return User(user_id) #need to check and understand above function app.secret_key = '9f2a1b8c4d6e7f0123456789abcdef01' #Shouldnt be hardcoded # this is Index page OR Home page.. @app.route('/') @login_required def index(): return render_template('index.html') # ---------------- LOGIN ROUTE ---------------- @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': loginData = LoginLDAP(request) # If bind successful → set session and log if loginData.isValidLogin: if loginData.isDefaultCredentials: LogHelper.log_action('Login', f"User {loginData.username} logged in (static user)") else: LogHelper.log_action('Login', f"User {loginData.username} logged in (LDAP)") session['username'] = loginData.username login_user(User(loginData.username)) return redirect(url_for('index', login='success')) else: flash(loginData.errorMessage, 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): LogHelper.log_action('Logout', f"User {current_user.id} logged out") # log the event logout_user() flash('You have been logged out.', 'info') return redirect(url_for('login')) @app.route('/activity_log', methods=['GET', 'POST']) @login_required def activity_log(): # Filters (GET or POST) start_date = request.values.get("start_date") end_date = request.values.get("end_date") user_name = request.values.get("username") logData = LogData() filtered_logs = logData.GetFilteredActivitiesLog(start_date,end_date,user_name) return render_template( "activity_log.html", logs=filtered_logs, start_date=start_date, end_date=end_date, username=user_name ) # ------------------------- State controller ------------------------------------------ @app.route('/add_state', methods=['GET', 'POST']) @login_required def add_state(): state = State() if request.method == 'POST': state.AddState(request=request) return state.resultMessage statedata = state.GetAllStates(request=request) return render_template('add_state.html', statedata=statedata) # AJAX route to check state existence @app.route('/check_state', methods=['POST']) @login_required def check_state(): state = State() return state.CheckState(request=request) # Delete State @app.route('/delete_state/', methods=['GET']) @login_required def deleteState(id): state = State() msg = state.DeleteState(request=request, id=id) if not state.isSuccess: return state.resultMessage else: return redirect(url_for('add_state')) # Edit State @app.route('/edit_state/', methods=['GET', 'POST']) @login_required def editState(id): connection = config.get_db_connection() cursor = connection.cursor() state = State() statedata = [] if request.method == 'POST': state.EditState(request=request, id=id) if state.isSuccess: return redirect(url_for('add_state')) else: return state.resultMessage else: statedata = state.GetStateByID(request=request, id=id) if not state.isSuccess: return state.resultMessage if statedata is None: statedata = [] return render_template('edit_state.html', state=statedata) # -------- end State controller ----------- # ------------------------- District controller ------------------------------------------ @app.route('/add_district', methods=['GET', 'POST']) @login_required def add_district(): district = District() if request.method == 'POST': district.AddDistrict(request=request) return district.resultMessage state = State() states = state.GetAllStates(request=request) districtdata = district.GetAllDistricts(request=request) return render_template('add_district.html', districtdata=districtdata, states=states) # AJAX route to check district existence @app.route('/check_district', methods=['POST']) @login_required def check_district(): district = District() return district.CheckDistrict(request=request) # Delete District @app.route('/delete_district/', methods=['GET']) @login_required def delete_district(district_id): district = District() district.DeleteDistrict(request=request, id=district_id) if not district.isSuccess: return district.resultMessage else: return redirect(url_for('add_district')) # Edit District @app.route('/edit_district/', methods=['GET', 'POST']) @login_required def edit_district(district_id): district = District() if request.method == 'POST': district.EditDistrict(request=request, id=district_id) if district.isSuccess: return redirect(url_for('add_district')) else: flash(district.resultMessage, "error") districtdata = district.GetDistrictByID(request=request, id=district_id) state = State() states = state.GetAllStates(request=request) return render_template('edit_district.html', districtdata=districtdata, states=states) # GET Request else: districtdata = district.GetDistrictByID(request=request, id=district_id) if not district.isSuccess: flash(district.resultMessage, "error") return redirect(url_for('add_district')) state = State() states = state.GetAllStates(request=request) if districtdata is None: districtdata = [] if states is None: states = [] return render_template('edit_district.html', districtdata=districtdata, states=states) # --------- end District controller ------------- # ------------------------- Block controller ------------------------------------------ @app.route('/add_block', methods=['GET', 'POST']) @login_required def add_block(): block = Block() district = District() # form submission if request.method == 'POST': block.AddBlock(request) return block.resultMessage # Fetch all states connection = config.get_db_connection() cursor = connection.cursor() cursor.callproc("GetAllStates") for rs in cursor.stored_results(): states = rs.fetchall() # Fetch all blocks block_data = block.GetAllBlocks() return render_template('add_block.html', states=states, block_data=block_data) @app.route('/check_block', methods=['POST']) @login_required def check_block(): block = Block() return block.CheckBlock(request) @app.route('/edit_block/', methods=['GET', 'POST']) @login_required def edit_block(block_id): block = Block() if request.method == 'POST': return block.EditBlock(request, block_id) # Load all states connection = config.get_db_connection() cursor = connection.cursor() cursor.callproc("GetAllStates") for rs in cursor.stored_results(): states = rs.fetchall() # Load all districts cursor.callproc("GetAllDistrictsData") for rs in cursor.stored_results(): districts = rs.fetchall() block_data = block.GetBlockByID(block_id) return render_template('edit_block.html', block_data=block_data, states=states, districts=districts) @app.route('/delete_block/') @login_required def delete_block(block_id): block = Block() block.DeleteBlock(block_id) return redirect(url_for('add_block')) # get block by district id @app.route('/get_blocks/', methods=['GET']) @login_required def get_blocks(district_id): connection = config.get_db_connection() cursor = connection.cursor() blocks = [] try: # cursor.execute("SELECT Block_Id, Block_Name FROM blocks WHERE District_id = %s", (district_id,)) # blocks = cursor.fetchall() cursor.callproc("GetBlocksByDistrict", (district_id,)) for rs in cursor.stored_results(): blocks = rs.fetchall() # log_action("Get blocks", f"User {current_user.id} Get Blocks '{district_id}'") except mysql.connector.Error as e: print(f"Error fetching blocks: {e}") return HtmlHelper.json_response({"error": "Failed to fetch blocks"}, 500) finally: cursor.close() connection.close() return jsonify({"blocks": [{"Block_Id": block[0], "Block_Name": block[1]} for block in blocks]}) # this is get district all data by using state id .. @app.route('/get_districts/', methods=['GET']) @login_required def get_districts(state_id): connection = config.get_db_connection() districts = [] if connection: cursor = connection.cursor() try: # cursor.execute("SELECT District_id, District_Name FROM districts WHERE State_Id = %s", (state_id,)) # districts = cursor.fetchall() cursor.callproc("GetDistrictsByStateId", (state_id,)) for dis in cursor.stored_results(): districts = dis.fetchall() LogHelper.log_action("Get District", f"User {current_user.id} Get District '{state_id}'") except mysql.connector.Error as e: print(f"Error fetching districts: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("districts"), 500) finally: cursor.close() connection.close() return jsonify({ "districts": [{"District_id": d[0], "District_Name": d[1]} for d in districts] }) # ----------- end Block controller ----------------- # ------------------------- Village controller ------------------------------------------ # Route to add a village @app.route('/add_village', methods=['GET', 'POST']) @login_required def add_village(): village = Village() if request.method == 'POST': village.AddVillage(request=request) return village.resultMessage state = State() # Use the State class to get states states = state.GetAllStates(request=request) villages = village.GetAllVillages(request=request) return render_template('add_village.html', states=states, villages=villages) @app.route('/check_village', methods=['POST']) @login_required def check_village(): village = Village() return village.CheckVillage(request=request) # Delete Village @app.route('/delete_village/', methods=['GET']) @login_required def delete_village(village_id): village = Village() village.DeleteVillage(request=request, village_id=village_id) if not village.isSuccess: flash(village.resultMessage, "error") return redirect(url_for('add_village')) else: return redirect(url_for('add_village')) # Edit Village @app.route('/edit_village/', methods=['GET', 'POST']) @login_required def edit_village(village_id): village = Village() if request.method == 'POST': village.EditVillage(request=request, village_id=village_id) if village.isSuccess: flash(village.resultMessage, "success") return redirect(url_for('add_village')) else: flash(village.resultMessage, "error") village_data = village.GetVillageByID(request=request, id=village_id) blocks = village.GetAllBlocks(request=request) return render_template('edit_village.html', village_data=village_data, blocks=blocks) else: village_data = village.GetVillageByID(request=request, id=village_id) if not village.isSuccess: flash(village.resultMessage, "error") return redirect(url_for('add_village')) blocks = village.GetAllBlocks(request=request) if village_data is None: village_data = [] # Ensure it's iterable in template if blocks is None: blocks = [] return render_template('edit_village.html', village_data=village_data, blocks=blocks) # ---- end Village controller --------------------- # -------------------------------- Invoice controller ------------------------------------------ @app.route('/add_invoice', methods=['GET', 'POST']) @login_required def add_invoice(): connection = config.get_db_connection() if not connection: return jsonify({"status": "error", "message": "Database connection failed"}), 500 if request.method == 'POST': try: cursor = connection.cursor(dictionary=True) # Get the village name from the form village_name = request.form.get('village') print("village name", village_name) # Query the database to get the corresponding Village_Id based on the village name # cursor.execute("SELECT Village_Id FROM villages WHERE Village_Name = %s", (village_name,)) # village_result = cursor.fetchone() cursor.callproc("GetVillageIdByName", (village_name,)) for rs in cursor.stored_results(): village_result = rs.fetchone() if not village_result: return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400 village_id = village_result['Village_Id'] # Fetch form data pmc_no = request.form.get('pmc_no') work_type = request.form.get('work_type') invoice_details = request.form.get('invoice_details') invoice_date = request.form.get('invoice_date') invoice_no = request.form.get('invoice_no') basic_amount = request.form.get('basic_amount') basic_amount=float(basic_amount) if basic_amount else 0.0 debit_amount = request.form.get('debit_amount') debit_amount=float(debit_amount) if debit_amount else 0.0 after_debit_amount = request.form.get('after_debit_amount') after_debit_amount=float(after_debit_amount) if after_debit_amount else 0.0 amount = request.form.get('amount') amount=float(amount) if amount else 0.0 gst_amount = request.form.get('gst_amount') gst_amount=float(gst_amount) if gst_amount else 0.0 tds_amount = request.form.get('tds_amount') tds_amount=float(tds_amount) if tds_amount else 0.0 sd_amount = request.form.get('sd_amount') sd_amount=float(sd_amount) if sd_amount else 0.0 on_commission = request.form.get('on_commission') on_commission=float(on_commission) if on_commission else 0.0 hydro_testing = request.form.get('hydro_testing') hydro_testing=float(hydro_testing) if hydro_testing else 0.0 gst_sd_amount = request.form.get('gst_sd_amount') gst_sd_amount=float(gst_sd_amount) if gst_sd_amount else 0.0 final_amount = request.form.get('final_amount') final_amount=float(final_amount) if final_amount else 0.0 # insert_invoice_query = ''' # INSERT INTO invoice ( # PMC_No, Village_Id, Work_Type, Invoice_Details, Invoice_Date, Invoice_No, # Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, # SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount # ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) # ''' # invoice_values = ( # pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, # basic_amount, debit_amount, after_debit_amount, amount, gst_amount, tds_amount, # sd_amount, on_commission, hydro_testing, gst_sd_amount, final_amount # ) # cursor.execute(insert_invoice_query, invoice_values) # connection.commit() # invoice_id = cursor.lastrowid cursor.callproc('InsertInvoice', [ pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, basic_amount, debit_amount, after_debit_amount, amount, gst_amount, tds_amount, sd_amount, on_commission, hydro_testing, gst_sd_amount, final_amount]) LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{ pmc_no}'") for result in cursor.stored_results(): invoice_id = result.fetchone()['invoice_id'] connection.commit() print("This is the invocie id from the invoice table ", invoice_id) # Insert into assign_subcontractors table # subcontractor_id = request.form.get('subcontractor_id') # insert_assign_query = ''' # INSERT INTO assign_subcontractors (PMC_no, Contractor_Id, Village_Id) # VALUES (%s, %s, %s) # ''' # cursor.execute(insert_assign_query, (pmc_no, subcontractor_id, village_id)) # connection.commit() subcontractor_id = request.form.get('subcontractor_id') cursor.callproc('AssignSubcontractor', [pmc_no, subcontractor_id, village_id]) connection.commit() # Insert Hold Amounts into invoice_subcontractor_hold_join table hold_types = request.form.getlist('hold_type[]') hold_amounts = request.form.getlist('hold_amount[]') hold_count = 0 for hold_type, hold_amount in zip(hold_types, hold_amounts): # cursor.execute("SELECT hold_type_id FROM hold_types WHERE hold_type = %s", (hold_type,)) # hold_type_result = cursor.fetchone() cursor.callproc('GetHoldTypeIdByName', [hold_type]) for result in cursor.stored_results(): hold_type_result = result.fetchone() print("hold type from invoice ", hold_type_result) if not hold_type_result: return jsonify({"status": "error", "message": f"Invalid Hold Type: {hold_type}"}), 400 hold_type_id = hold_type_result['hold_type_id'] # insert_hold_query = ''' # INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount) # VALUES (%s, %s, %s, %s) # ''' # cursor.execute(insert_hold_query, (subcontractor_id, invoice_id, hold_type_id, hold_amount)) # hold_count += 1 # connection.commit() cursor.callproc('InsertInvoiceSubcontractorHold', [ subcontractor_id, invoice_id, hold_type_id, hold_amount ]) connection.commit() hold_count += 1 print("Hold count from the invoice", hold_count) connection.commit() return jsonify({"status": "success", "message": "Invoice added successfully"}), 201 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Failed to add invoice: {str(e)}"}), 500 finally: cursor.close() connection.close() # GET request: fetch and display all invoices (all fields) along with the form try: cursor = connection.cursor(dictionary=True) # cursor.execute("SELECT * FROM view_invoice_details") # invoices = cursor.fetchall() cursor.callproc('GetAllInvoiceDetails') for result in cursor.stored_results(): invoices = result.fetchall() villages = [] cursor.callproc("GetAllVillages") for result in cursor.stored_results(): villages = result.fetchall() except mysql.connector.Error as e: print(f"Error: {e}") invoices = [] finally: cursor.close() connection.close() return render_template('add_invoice.html', invoices=invoices, villages=villages) # search subcontraactor to assing invoice @app.route('/search_subcontractor', methods=['POST']) @login_required def search_subcontractor(): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("database connection"), 500) sub_query = request.form.get("query") try: cursor = connection.cursor(dictionary=True) # cursor.execute( # "SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name LIKE %s", # (f"%{sub_query}%",) # ) # results = cursor.fetchall() cursor.callproc('SearchContractorsByName', [sub_query]) for result in cursor.stored_results(): results = result.fetchall() print(results) if not results: return "
  • No subcontractor found
  • " output = "".join( f"
  • {row['Contractor_Name']}
  • " for row in results ) print("Ajax Call for subcontractor", output) return output except mysql.connector.Error as e: return HtmlHelper.json_response(ResponseHandler.fetch_failure(f"Search failed: {str(e)}"), 500) finally: cursor.close() connection.close() # get hold types @app.route('/get_hold_types', methods=['GET']) @login_required def get_hold_types(): connection = config.get_db_connection() try: cursor = connection.cursor(dictionary=True) # cursor.execute("SELECT hold_type_id, hold_type FROM hold_types") # hold_types = cursor.fetchall() cursor.callproc("GetAllHoldTypes") for hold in cursor.stored_results(): hold_types = hold.fetchall() LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type'{ hold_types}'") return jsonify(hold_types) except mysql.connector.Error as e: return ResponseHandler.fetch_failure({str(e)}), 500 # return jsonify({"status": "error", "message": f"Failed to fetch hold types: {str(e)}"}), 500 finally: cursor.close() connection.close() # update invoice by id @app.route('/edit_invoice/', methods=['GET', 'POST']) @login_required def edit_invoice(invoice_id): connection = config.get_db_connection() if not connection: return jsonify({"status": "error", "message": "Database connection failed"}), 500 cursor = connection.cursor(dictionary=True) if request.method == 'POST': try: # Fetch updated form data subcontractor_id = request.form.get('subcontractor_id', '').strip() subcontractor_id = int(subcontractor_id) if subcontractor_id else None village_name = request.form.get('village') # cursor.execute("SELECT Village_Id FROM villages WHERE Village_Name = %s", (village_name,)) # village_result = cursor.fetchone() cursor.callproc("GetVillageIdByName", (village_name,)) for rs in cursor.stored_results(): village_result = rs.fetchone() if not village_result: return jsonify({"status": "error", "message": "Invalid Village Name"}), 400 village_id = village_result['Village_Id'] pmc_no = request.form.get('pmc_no') work_type = request.form.get('work_type') invoice_details = request.form.get('invoice_details') invoice_date = request.form.get('invoice_date') invoice_no = request.form.get('invoice_no') LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice'{ invoice_id}'") # Convert numeric fields properly numeric_fields = { "basic_amount": request.form.get('basic_amount'), "debit_amount": request.form.get('debit_amount'), "after_debit_amount": request.form.get('after_debit_amount'), "amount": request.form.get('amount'), "gst_amount": request.form.get('gst_amount'), "tds_amount": request.form.get('tds_amount'), "sd_amount": request.form.get('sd_amount'), "on_commission": request.form.get('on_commission'), "hydro_testing": request.form.get('hydro_testing'), "gst_sd_amount": request.form.get('gst_sd_amount'), "final_amount": request.form.get('final_amount'), } numeric_fields = {k: float(v) if v else 0 for k, v in numeric_fields.items()} # # Update invoice # update_invoice_query = ''' # UPDATE invoice # SET PMC_No=%s, Village_Id=%s, Work_Type=%s, Invoice_Details=%s, Invoice_Date=%s, # Invoice_No=%s, Basic_Amount=%s, Debit_Amount=%s, After_Debit_Amount=%s, # Amount=%s, GST_Amount=%s, TDS_Amount=%s, SD_Amount=%s, On_Commission=%s, # Hydro_Testing=%s, GST_SD_Amount=%s, Final_Amount=%s # WHERE Invoice_Id=%s # ''' # invoice_values = ( # pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, # *numeric_fields.values(), invoice_id # ) # cursor.execute(update_invoice_query, invoice_values) # connection.commit() cursor.callproc('UpdateInvoice', [ pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, *numeric_fields.values(), invoice_id ]) connection.commit() # Handle holds hold_types = request.form.getlist('hold_type[]') hold_amounts = request.form.getlist('hold_amount[]') for hold_type, hold_amount in zip(hold_types, hold_amounts): if not hold_type: continue # skip empty hold types # Get or insert hold type # cursor.execute("SELECT hold_type_id FROM hold_types WHERE hold_type = %s", (hold_type,)) # hold_type_result = cursor.fetchone() cursor.callproc('GetHoldTypeIdByName', [hold_type]) for result in cursor.stored_results(): hold_type_result = result.fetchone() # if not hold_type_result: # cursor.execute("INSERT INTO hold_types (hold_type) VALUES (%s)", (hold_type,)) # connection.commit() # hold_type_id = cursor.lastrowid # else: # hold_type_id = hold_type_result['hold_type_id'] if not hold_type_result: # Call stored procedure to insert and return new ID cursor.callproc('InsertHoldType', [hold_type, 0]) for result in cursor.stored_results(): pass # advance past any results cursor.execute("SELECT @_InsertHoldType_1") hold_type_id = cursor.fetchone()[0] print("if not hold type result anish:", hold_type_id) else: hold_type_id = hold_type_result['hold_type_id'] print("if hold type result anish:", hold_type_id) hold_amount = float(hold_amount) if hold_amount else 0 # Check if join exists # cursor.execute(""" # SELECT join_id FROM invoice_subcontractor_hold_join # WHERE Invoice_Id = %s AND Contractor_Id = %s AND hold_type_id = %s # """, (invoice_id, subcontractor_id, hold_type_id)) # join_result = cursor.fetchone() cursor.callproc('GetHoldJoinId', [invoice_id, subcontractor_id, hold_type_id]) for result in cursor.stored_results(): join_result = result.fetchone() if join_result: # cursor.execute(""" # UPDATE invoice_subcontractor_hold_join # SET hold_amount = %s # WHERE join_id = %s # """, (hold_amount, join_result['join_id'])) cursor.callproc('UpdateHoldAmountByJoinId', [hold_amount, join_result['join_id']]) connection.commit() else: # cursor.execute(""" # INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount) # VALUES (%s, %s, %s, %s) # """, (subcontractor_id, invoice_id, hold_type_id, hold_amount)) cursor.callproc('InsertInvoiceSubcontractorHold', [ subcontractor_id, invoice_id, hold_type_id, hold_amount ]) connection.commit() connection.commit() return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Failed to update invoice: {str(e)}"}), 500 finally: cursor.close() connection.close() # ------------------ GET Request ------------------ try: # Fetch invoice data # cursor.execute( # """SELECT i.*, s.Contractor_Name, v.Village_Name # FROM invoice i # LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_no AND i.Village_Id = a.Village_Id # LEFT JOIN subcontractors s ON a.Contractor_Id = s.Contractor_Id # LEFT JOIN villages v ON i.Village_Id = v.Village_Id # WHERE i.Invoice_Id = %s""", (invoice_id,) # ) # invoice = cursor.fetchone() cursor.callproc('GetInvoiceDetailsById', [invoice_id]) for result in cursor.stored_results(): invoice = result.fetchone() if not invoice: return jsonify({"status": "error", "message": "Invoice not found"}), 404 # Important! Clear unread result issue while cursor.nextset(): pass # Fetch hold amounts # cursor.execute( # """SELECT h.hold_type, ihj.hold_amount # FROM invoice_subcontractor_hold_join ihj # JOIN hold_types h ON ihj.hold_type_id = h.hold_type_id # WHERE ihj.Invoice_Id = %s""", (invoice_id,) # ) # hold_amounts = cursor.fetchall() # invoice["hold_amounts"] = hold_amounts cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id]) for result in cursor.stored_results(): hold_amounts = result.fetchall() invoice["hold_amounts"] = hold_amounts except mysql.connector.Error as e: return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 finally: cursor.close() connection.close() return render_template('edit_invoice.html', invoice=invoice) # delete invoice by id @app.route('/delete_invoice/', methods=['GET']) @login_required def delete_invoice(invoice_id): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("invoice"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM invoice WHERE Invoice_Id = %s", (invoice_id,)) cursor.callproc("DeleteInvoice", (invoice_id,)) LogHelper.log_action("Delete invoice", f"User {current_user.id} Delete invoice'{ invoice_id}'") connection.commit() # Check if the invoice was actually deleted if cursor.rowcount == 0: return HtmlHelper.json_response(ResponseHandler.fetch_failure("invoice"), 404) return redirect(url_for('add_invoice')) except mysql.connector.Error as e: print("Error deleting invoice:", e) return HtmlHelper.json_response(ResponseHandler.delete_failure("invoice"), 500) finally: cursor.close() connection.close() # ---------- end Invoice controller ------------------ # ----------------------------- Payment controller ------------------------------------------ # this is Payment Page to add data # @app.route('/add_payment', methods=['GET', 'POST']) # def add_payment(): # connection = config.get_db_connection() # payments = [] # List to hold payment history # # if not connection: # return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500) # # try: # cursor = connection.cursor() # # # Retrieve payment history # # cursor.execute( # # "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment" # # ) # # payments = cursor.fetchall() # cursor.callproc("GetAllPayments") # for result in cursor.stored_results(): # payments = result.fetchall() # # except mysql.connector.Error as e: # print(f"Error fetching payment history: {e}") # return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500) # finally: # cursor.close() # # if request.method == 'POST': # pmc_no = request.form['PMC_No'] # invoice_no = request.form['invoice_No'] # amount = request.form['Payment_Amount'] # tds_amount = request.form['TDS_Payment_Amount'] # total_amount = request.form['total_amount'] # utr = request.form['utr'] # # try: # cursor = connection.cursor() # cursor.callproc('SavePayment', ( # pmc_no, invoice_no, amount, tds_amount, total_amount, utr # )) # connection.commit() # return redirect(url_for('add_payment')) # Redirect to add_payment page to reload the form # except mysql.connector.Error as e: # print(f"Error inserting payment: {e}") # return HtmlHelper.json_response(ResponseHandler.add_failure("payment"), 500) # finally: # cursor.close() # connection.close() # # return render_template('add_payment.html', payments=payments) @app.route('/add_payment', methods=['GET', 'POST']) @login_required def add_payment(): connection = config.get_db_connection() payments = [] if connection: cursor = connection.cursor() try: # cursor.execute( # "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment" # ) # payments = cursor.fetchall() cursor.callproc('GetAllPayments') for result in cursor.stored_results(): payments = result.fetchall() except mysql.connector.Error as e: print(f"Error fetching payment history: {e}") return "Failed to fetch payment history", 500 finally: cursor.close() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] amount = request.form['Payment_Amount'] tds_amount = request.form['TDS_Payment_Amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] LogHelper.log_action("Add Payment", f"User {current_user.id} Add Payment'{ pmc_no}'") try: cursor = connection.cursor() # cursor.execute('''INSERT INTO payment (PMC_No, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) # VALUES (%s, %s, %s, %s, %s, %s)''', # (pmc_no, invoice_no, amount, tds_amount, total_amount, utr)) # connection.commit() cursor.callproc('InsertPayments', [ pmc_no, invoice_no, amount, tds_amount, total_amount, utr ]) connection.commit() return redirect(url_for('add_payment')) except mysql.connector.Error as e: print(f"Error inserting payment: {e}") return "Failed to add payment", 500 finally: cursor.close() connection.close() return render_template('add_payment.html', payments=payments) @app.route('/get_pmc_nos_by_subcontractor/') @login_required def get_pmc_nos_by_subcontractor(subcontractorId): connection = config.get_db_connection() cur = connection.cursor() print(subcontractorId) # query = """ # SELECT DISTINCT i.PMC_No # FROM invoice i # JOIN assign_subcontractors a ON i.PMC_No = a.PMC_no # JOIN subcontractors s ON a.Contractor_Id = s.Contractor_Id # WHERE s.Contractor_Id=%s; # """ # cur.execute(query, (subcontractorId,)) # results = cur.fetchall() cur.callproc('GetDistinctPMCNoByContractorId', [subcontractorId]) for result in cur.stored_results(): results = result.fetchall() print(results) pmc_nos = [row[0] for row in results] cur.close() return jsonify({'pmc_nos': pmc_nos}) # Edit Payment Route @app.route('/edit_payment/', methods=['GET', 'POST']) @login_required def edit_payment(payment_id): connection = config.get_db_connection() payment_data = {} # To hold the payment data for the given ID if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500) try: cursor = connection.cursor() # Fetch the existing payment data for the given payment_id # cursor.execute( # "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment WHERE Payment_Id = %s", # (payment_id,) # ) # payment_data = cursor.fetchone() cursor.callproc("GetPaymentById", (payment_id,)) for result in cursor.stored_results(): payment_data = result.fetchone() # Handle POST request to update the payment if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] amount = request.form['Payment_Amount'] tds_amount = request.form['TDS_Payment_Amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] LogHelper.log_action("Edit Payment", f"User {current_user.id} Edit Payment'{ pmc_no}'") try: # cursor.execute('''UPDATE payment SET PMC_No=%s, Invoice_No=%s, Payment_Amount=%s, TDS_Payment_Amount=%s, # Total_Amount=%s, UTR=%s WHERE Payment_Id=%s''', # (pmc_no, invoice_no, amount, tds_amount, total_amount, utr, payment_id)) cursor.callproc("UpdatePayment", (payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr,)) connection.commit() return redirect(url_for('add_payment')) # Redirect to add_payment page to view the updated list except mysql.connector.Error as e: print(f"Error updating payment: {e}") return HtmlHelper.json_response(ResponseHandler.update_failure("payment"), 500) except mysql.connector.Error as e: print(f"Error fetching payment data: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500) finally: cursor.close() connection.close() return render_template('edit_payment.html', payment_data=payment_data) # Delete Payment Route @app.route('/delete_payment/', methods=['GET', 'POST']) @login_required def delete_payment(payment_id): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM payment WHERE Payment_Id = %s", (payment_id,)) cursor.callproc("DeletePayment", (payment_id,)) LogHelper.log_action("Delete Payment", f"User {current_user.id} Delete Payment'{ payment_id}'") connection.commit() # Check if any rows were deleted if cursor.rowcount == 0: return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 404) return redirect(url_for('add_payment')) # Redirect back to the add_payment page except mysql.connector.Error as e: print(f"Error deleting payment: {e}") return HtmlHelper.json_response(ResponseHandler.delete_failure("payment"), 500) finally: cursor.close() connection.close() # --- end Payment controller ----------- # ------------------------- GST Release controller ------------------------------------------ @app.route('/add_gst_release', methods=['GET', 'POST']) @login_required def add_gst_release(): connection = config.get_db_connection() gst_releases = [] # List to hold GST Release history invoices = [] # List to hold invoices for the dropdown if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # Retrieve GST Release history cursor.execute("SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release") gst_releases = cursor.fetchall() # cursor.callproc("GetAllGSTReleases") # for result in cursor.stored_results(): # gst_releases = result.fetchall() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] basic_amount = request.form['basic_amount'] final_amount = request.form['final_amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] contractor_id = request.form['subcontractor_id'] LogHelper.log_action("Add gst_release", f"User {current_user.id} Add gst_release'{ pmc_no}'") # cursor.callproc('SaveGSTRelease', ( # pmc_no, invoice_no, basic_amount, final_amount,total_amount, utr # )) # connection.commit() cursor.execute(""" INSERT INTO gst_release (PMC_No, invoice_no, Basic_Amount, Final_Amount, Total_Amount, UTR, Contractor_Id) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id )) connection.commit() return redirect(url_for('add_gst_release')) # Redirect to add_gst_release page except mysql.connector.Error as e: print(f"Error: {e}") return HtmlHelper.json_response(ResponseHandler.add_failure("GST Release"), 500) finally: cursor.close() connection.close() return render_template('add_gst_release.html', invoices=invoices, gst_releases=gst_releases) # update gst Release by id @app.route('/edit_gst_release/', methods=['GET', 'POST']) @login_required def edit_gst_release(gst_release_id): connection = config.get_db_connection() gst_release_data = {} # To hold the GST release data for the given ID invoices = [] # List to hold invoices for the dropdown if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # Fetch the existing GST release data for the given gst_release_id cursor.execute( "SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release WHERE GST_Release_Id = %s", (gst_release_id,) ) gst_release_data = cursor.fetchone() # cursor.callproc("GetGSTReleaseById", (gst_release_id,)) # for result in cursor.stored_results(): # gst_release_data = result.fetchone() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] basic_amount = request.form['basic_amount'] final_amount = request.form['final_amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] LogHelper.log_action("Edit gst_release", f"User {current_user.id} Edit gst_release'{ pmc_no}'") try: cursor.execute(""" UPDATE gst_release SET PMC_No = %s, invoice_no = %s, Basic_Amount = %s, Final_Amount = %s, Total_Amount = %s, UTR = %s WHERE GST_Release_Id = %s """, ( pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id )) # cursor.callproc("UpdateGSTRelease", (gst_release_id, pmc_id, invoice_no, basic_amount, final_amount)) # # connection.commit() return redirect(url_for('add_gst_release')) # Redirect to the page to view the updated list except mysql.connector.Error as e: print(f"Error updating GST Release: {e}") return HtmlHelper.json_response(ResponseHandler.update_failure("GST Release"), 500) except mysql.connector.Error as e: print(f"Error fetching GST Release data: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500) finally: cursor.close() connection.close() return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=invoices) # delete gst release by id @app.route('/delete_gst_release/', methods=['GET', 'POST']) @login_required def delete_gst_release(gst_release_id): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM gst_release WHERE GST_Release_Id = %s", (gst_release_id,)) cursor.callproc("DeleteGSTRelease", (gst_release_id,)) LogHelper.log_action("delete gst_release", f"User {current_user.id} delete gst_release'{ gst_release_id}'") connection.commit() # Check if any rows were deleted if cursor.rowcount == 0: return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 404) return redirect(url_for('add_gst_release')) # Redirect to the add_gst_release page except mysql.connector.Error as e: print(f"Error deleting GST Release: {e}") return HtmlHelper.json_response(ResponseHandler.delete_failure("GST Release"), 500) finally: cursor.close() connection.close() # --- end GST Release controller ----- # ------------------------- Subcontractor controller ------------------------------------------ @app.route('/subcontractor', methods=['GET', 'POST']) @login_required def subcontract(): connection = config.get_db_connection() subcontractor = [] if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() if request.method == 'GET': try: # cursor.execute('SELECT * FROM subcontractors;') # subcontractor = cursor.fetchall() # Fetch the current subcontractor list # connection.commit() cursor.callproc('GetAllSubcontractors') for result in cursor.stored_results(): subcontractor = result.fetchall() except Error as e: print(f"Error fetching data: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) if request.method == 'POST': contractor_data = { 'Contractor_Name': request.form['Contractor_Name'], 'Address': request.form['Address'], 'Mobile_No': request.form['Mobile_No'], 'PAN_No': request.form['PAN_No'], 'Email': request.form['Email'], 'Gender': request.form['Gender'], 'GST_Registration_Type': request.form['GST_Registration_Type'], 'GST_No': request.form['GST_No'], 'Contractor_password': request.form['Contractor_password'], } try: cursor.callproc('SaveContractor', ( contractor_data['Contractor_Name'], contractor_data['Address'], contractor_data['Mobile_No'], contractor_data['PAN_No'], contractor_data['Email'], contractor_data['Gender'], contractor_data['GST_Registration_Type'], contractor_data['GST_No'], contractor_data['Contractor_password'] )) connection.commit() # Re-fetch subcontractors after inserting the new one # cursor.execute('SELECT * FROM subcontractors') # subcontractor = cursor.fetchall() cursor.callproc('GetAllSubcontractors') for result in cursor.stored_results(): subcontractor = result.fetchall() except Error as e: print(f"Error inserting data: {e}") return HtmlHelper.json_response(ResponseHandler.add_failure("Subcontractor"), 500) except Error as e: print(f"Error handling subcontractor data: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return render_template('add_subcontractor.html', subcontractor=subcontractor) # update subcontractor by id @app.route('/edit_subcontractor/', methods=['GET', 'POST']) @login_required def edit_subcontractor(id): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() subcontractor = None # Fetch existing subcontractor data by ID # cursor.execute('SELECT * FROM subcontractors WHERE Contractor_Id = %s', (id,)) # subcontractor = cursor.fetchone() cursor.callproc("GetSubcontractorById", (id,)) for contractors in cursor.stored_results(): subcontractor = contractors.fetchone() if not subcontractor: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 404) if request.method == 'POST': updated_data = { 'Contractor_Name': request.form['Contractor_Name'], 'Address': request.form['Address'], 'Mobile_No': request.form['Mobile_No'], 'PAN_No': request.form['PAN_No'], 'Email': request.form['Email'], 'Gender': request.form['Gender'], 'GST_Registration_Type': request.form['GST_Registration_Type'], 'GST_No': request.form['GST_No'], 'Contractor_password': request.form['Contractor_password'], 'id': id } LogHelper.log_action("Edit Subcontractor", f"User {current_user.id}Edit Subcontractor'{ id}'") try: # cursor.execute("""UPDATE subcontractors SET # Contractor_Name=%(Contractor_Name)s, # Address=%(Address)s, # Mobile_No=%(Mobile_No)s, # PAN_No=%(PAN_No)s, # Email=%(Email)s, # Gender=%(Gender)s, # GST_Registration_Type=%(GST_Registration_Type)s, # GST_No=%(GST_No)s, # Contractor_password=%(Contractor_password)s # WHERE Contractor_Id=%(id)s""", updated_data) cursor.callproc("UpdateSubcontractor", ( id, updated_data['Contractor_Name'], updated_data['Address'], updated_data['Mobile_No'], updated_data['PAN_No'], updated_data['Email'], updated_data['Gender'], updated_data['GST_Registration_Type'], updated_data['GST_No'], updated_data['Contractor_password'] )) connection.commit() return redirect(url_for('subcontract')) except Error as e: print(f"Error updating subcontractor: {e}") return HtmlHelper.json_response(ResponseHandler.update_failure("Subcontractor"), 500) except Error as e: print(f"Error fetching subcontractor data: {e}") return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return render_template('edit_subcontractor.html', subcontractor=subcontractor) # delete Sub-Contractor methods by id .. # @app.route('/deleteSubContractor/', methods=['GET', 'POST']) # def deleteSubContractor(id): # connection = config.get_db_connection() # if not connection: # return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) # try: # cursor = connection.cursor() # # cursor.execute("DELETE FROM subcontractors WHERE Contractor_Id = %s", (id,)) # cursor.callproc("DeleteSubcontractor", (id,)) # connection.commit() # # Check if any row was deleted (subcontractor found) # if cursor.rowcount == 0: # return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 404) # except Error as e: # print(f"Error deleting subcontractor: {e}") # return HtmlHelper.json_response(ResponseHandler.delete_failure("Subcontractor"), 500) # finally: # cursor.close() # connection.close() # return redirect(url_for('subcontract')) @app.route('/deleteSubContractor/', methods=['GET', 'POST']) @login_required def deleteSubContractor(id): connection = config.get_db_connection() if not connection: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() # Optional: check if subcontractor exists before attempting delete cursor.execute("SELECT 1 FROM subcontractors WHERE Contractor_Id = %s", (id,)) if cursor.fetchone() is None: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor not found"), 404) # Call stored procedure to delete subcontractor and related records cursor.callproc("DeleteSubcontractor", (id,)) connection.commit() # Retrieve result from procedure (SELECT ROW_COUNT()) affected_rows = 0 for result in cursor.stored_results(): row = result.fetchone() affected_rows = row[0] if row else 0 LogHelper.log_action("Delete Subcontractor", f"User {current_user.id}Delete Subcontractor'{ id}'") if affected_rows == 0: return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor not deleted"), 404) except Error as e: print(f"Error deleting subcontractor: {e}") return HtmlHelper.json_response(ResponseHandler.delete_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return redirect(url_for('subcontract')) # redirect to subcontractor list page # ------------------------------- Show Report Subcontractor --------------------- UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) # Upload Excel file html page @app.route('/upload_excel_file', methods=['GET', 'POST']) def upload(): if request.method == 'POST': file = request.files['file'] if file and file.filename.endswith('.xlsx'): filepath = os.path.join(app.config['UPLOAD_FOLDER'], file.filename) file.save(filepath) LogHelper.log_action("Upload Excel File", f"User {current_user.id}Upload Excel File'{file}'") return redirect(url_for('show_table', filename=file.filename)) return render_template('uploadExcelFile.html') # Show excel data in tables6 # @app.route('/show_table/') # def show_table(filename): # global data # data = [] # # filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) # wb = openpyxl.load_workbook(filepath, data_only=True) # sheet = wb.active # # # Extract key file information from the first 4 rows # file_info = { # "Subcontractor": sheet.cell(row=1, column=2).value, # "State": sheet.cell(row=2, column=2).value, # "District": sheet.cell(row=3, column=2).value, # "Block": sheet.cell(row=4, column=2).value, # } # # errors = [] # subcontractor_data = None # state_data = None # district_data = None # block_data = None # # # Database connection # connection = config.get_db_connection() # if connection: # try: # cursor = connection.cursor(dictionary=True) # # # Validate State # # cursor.execute("SELECT State_ID, State_Name FROM states WHERE State_Name = %s", (file_info['State'],)) # # state_data = cursor.fetchone() # cursor.callproc('GetStateByName', [file_info['State']]) # for result in cursor.stored_results(): # state_data = result.fetchone() # # if not state_data: # errors.append(f"State '{file_info['State']}' is not valid. Please add it.") # # # Validate District # if state_data: # # cursor.execute( # # "SELECT District_ID, District_Name FROM districts WHERE District_Name = %s AND State_ID = %s", # # (file_info['District'], state_data['State_ID']) # # ) # # district_data = cursor.fetchone() # cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']]) # for result in cursor.stored_results(): # district_data = result.fetchone() # # if not district_data: # errors.append( # f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.") # # # Validate Block # if district_data: # # cursor.execute( # # "SELECT Block_Id, Block_Name FROM blocks WHERE Block_Name = %s AND District_ID = %s", # # (file_info['Block'], district_data['District_ID']) # # ) # # block_data = cursor.fetchone() # cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']]) # for result in cursor.stored_results(): # block_data = result.fetchone() # # if not block_data: # errors.append( # f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.") # # # old code # # # Validate Subcontractor # # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM SubContractors WHERE Contractor_Name = %s", # # (file_info['Subcontractor'],)) # # subcontractor_data = cursor.fetchone() # cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) # for result in cursor.stored_results(): # subcontractor_data = result.fetchone() # # if not subcontractor_data: # # cursor.execute("INSERT INTO subcontractors (Contractor_Name) VALUES (%s)", # # (file_info['Subcontractor'],)) # # connection.commit() # cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']]) # connection.commit() # # # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM SubContractors WHERE Contractor_Name = %s", # # (file_info['Subcontractor'],)) # # subcontractor_data = cursor.fetchone() # cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) # for result in cursor.stored_results(): # subcontractor_data = result.fetchone() # # # new code # # cursor.callproc('ValidateAndInsertSubcontractor', (file_info['Subcontractor'], 0, '')) # # # # for con in cursor.stored_results(): # # subcontractor_data = con.fetchone() # # print("subcon:",subcontractor_data) # # # # print("subcontractor_data",subcontractor_data) # # # Get hold types data from database (for faster lookup) # # cursor.execute("SELECT hold_type_id, hold_type FROM hold_types") # # hold_types_data = cursor.fetchall() # # cursor.callproc("GetAllHoldTypes") # for ht in cursor.stored_results(): # hold_types_data = ht.fetchall() # # hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if # row['hold_type']} # # cursor.close() # except mysql.connector.Error as e: # print(f"Database error: {e}") # return "Database operation failed", 500 # finally: # connection.close() # # # Extract dynamic variable names from row 5 and detect "hold" columns # variables = {} # hold_columns = [] # hold_counter = 0 # # for j in range(1, sheet.max_column + 1): # col_value = sheet.cell(row=5, column=j).value # if col_value: # variables[col_value] = j # Store column name with its position # # # Check if the column header contains the word 'hold' # if 'hold' in str(col_value).lower(): # hold_counter += 1 # # Lookup hold type id from database # hold_type_key = str(col_value).lower().strip() # hold_type_id = hold_types_lookup.get(hold_type_key, None) # hold_columns.append({ # 'column_name': col_value, # 'column_number': j, # 'hold_type_id': hold_type_id # }) # # # Extract data dynamically based on row numbers # for i in range(6, sheet.max_row + 1): # row_data = {} # if sheet.cell(row=i, column=1).value: # row_data["Row Number"] = i # Store row number # for var_name, col_num in variables.items(): # row_data[var_name] = sheet.cell(row=i, column=col_num).value # # Check if at least 4 non-empty cells exist in the row # if sum(1 for value in row_data.values() if value) >= 4: # data.append(row_data) # # # For debugging or console output, you can print the hold columns info # for hold in hold_columns: # if hold['hold_type_id']: # print( # f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") # else: # errors.append( # f"Hold Type not added ! Column name '{hold['column_name']}'.") # print( # f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") # # return render_template( # 'show_excel_file.html', # file_info=file_info, # variables=variables, # data=data, # subcontractor_data=subcontractor_data, # state_data=state_data, # district_data=district_data, # block_data=block_data, # errors=errors, # hold_columns=hold_columns, # hold_counter=hold_counter # ) @app.route('/show_table/') def show_table(filename): global data data = [] filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) wb = openpyxl.load_workbook(filepath, data_only=True) sheet = wb.active file_info = { "Subcontractor": sheet.cell(row=1, column=2).value, "State": sheet.cell(row=2, column=2).value, "District": sheet.cell(row=3, column=2).value, "Block": sheet.cell(row=4, column=2).value, } errors = [] subcontractor_data = None state_data = None district_data = None block_data = None connection = config.get_db_connection() if connection: try: cursor = connection.cursor(dictionary=True) print(f"Calling GetStateByName with: {file_info['State']}") cursor.callproc('GetStateByName', [file_info['State']]) for result in cursor.stored_results(): state_data = result.fetchone() if not state_data: errors.append(f"State '{file_info['State']}' is not valid. Please add it.") if state_data: print(f"Calling GetDistrictByNameAndStates with: {file_info['District']}, {state_data['State_ID']}") cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']]) for result in cursor.stored_results(): district_data = result.fetchone() if not district_data: errors.append(f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.") if district_data: print(f"Calling GetBlockByNameAndDistricts with: {file_info['Block']}, {district_data['District_ID']}") cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']]) for result in cursor.stored_results(): block_data = result.fetchone() if not block_data: errors.append(f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.") print(f"Calling GetSubcontractorByName with: {file_info['Subcontractor']}") cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) for result in cursor.stored_results(): subcontractor_data = result.fetchone() if not subcontractor_data: print(f"Inserting subcontractor: {file_info['Subcontractor']}") cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']]) connection.commit() print(f"Calling GetSubcontractorByName again with: {file_info['Subcontractor']}") cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) for result in cursor.stored_results(): subcontractor_data = result.fetchone() print("Calling GetAllHoldTypes") cursor.callproc("GetAllHoldTypes") hold_types_data = [] for ht in cursor.stored_results(): hold_types_data = ht.fetchall() hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']} cursor.close() except mysql.connector.Error as e: print(f"Database error: {e}") return f"Database operation failed: {e}", 500 finally: connection.close() variables = {} hold_columns = [] hold_counter = 0 for j in range(1, sheet.max_column + 1): col_value = sheet.cell(row=5, column=j).value if col_value: variables[col_value] = j if 'hold' in str(col_value).lower(): hold_counter += 1 hold_type_key = str(col_value).lower().strip() hold_type_id = hold_types_lookup.get(hold_type_key, None) hold_columns.append({ 'column_name': col_value, 'column_number': j, 'hold_type_id': hold_type_id }) for i in range(6, sheet.max_row + 1): row_data = {} if sheet.cell(row=i, column=1).value: row_data["Row Number"] = i for var_name, col_num in variables.items(): row_data[var_name] = sheet.cell(row=i, column=col_num).value if sum(1 for value in row_data.values() if value) >= 4: data.append(row_data) for hold in hold_columns: if hold['hold_type_id']: print(f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") else: errors.append(f"Hold Type not added ! Column name '{hold['column_name']}'.") print(f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") return render_template( 'show_excel_file.html', file_info=file_info, variables=variables, data=data, subcontractor_data=subcontractor_data, state_data=state_data, district_data=district_data, block_data=block_data, errors=errors, hold_columns=hold_columns, hold_counter=hold_counter ) # Show excel data in tables6 # @app.route('/show_table/') # def show_table(filename): # global data # data = [] # # filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) # wb = openpyxl.load_workbook(filepath, data_only=True) # sheet = wb.active # # # Extract key file information from the first 4 rows # file_info = { # "Subcontractor": sheet.cell(row=1, column=2).value, # "State": sheet.cell(row=2, column=2).value, # "District": sheet.cell(row=3, column=2).value, # "Block": sheet.cell(row=4, column=2).value, # } # # errors = [] # subcontractor_data = None # state_data = None # district_data = None # block_data = None # # # Database connection # connection = config.get_db_connection() # if connection: # try: # cursor = connection.cursor(dictionary=True) # # # Validate State # cursor.execute("SELECT State_ID, State_Name FROM states WHERE State_Name = %s", (file_info['State'],)) # state_data = cursor.fetchone() # if not state_data: # errors.append(f"State '{file_info['State']}' is not valid. Please add it.") # # # Validate District # if state_data: # cursor.execute( # "SELECT District_ID, District_Name FROM districts WHERE District_Name = %s AND State_ID = %s", # (file_info['District'], state_data['State_ID']) # ) # district_data = cursor.fetchone() # if not district_data: # errors.append( # f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.") # # # Validate Block # if district_data: # cursor.execute( # "SELECT Block_Id, Block_Name FROM blocks WHERE Block_Name = %s AND District_ID = %s", # (file_info['Block'], district_data['District_ID']) # ) # block_data = cursor.fetchone() # if not block_data: # errors.append( # f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.") # # # # old code # # # Validate Subcontractor # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name = %s", # (file_info['Subcontractor'],)) # subcontractor_data = cursor.fetchone() # # if not subcontractor_data: # cursor.execute("INSERT INTO subcontractors (Contractor_Name) VALUES (%s)", # (file_info['Subcontractor'],)) # connection.commit() # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name = %s", # (file_info['Subcontractor'],)) # subcontractor_data = cursor.fetchone() # # # new code # # cursor.callproc('ValidateAndInsertSubcontractor', (file_info['Subcontractor'], 0, '')) # # # # for con in cursor.stored_results(): # # subcontractor_data = con.fetchone() # # print("subcon:",subcontractor_data) # # # # print("subcontractor_data",subcontractor_data) # # # Get hold types data from database (for faster lookup) # # cursor.execute("SELECT hold_type_id, hold_type FROM hold_types") # # hold_types_data = cursor.fetchall() # # cursor.callproc("GetAllHoldTypes") # for ht in cursor.stored_results(): # hold_types_data = ht.fetchall() # # # hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']} # # # cursor.close() # except mysql.connector.Error as e: # print(f"Database error: {e}") # # # return "Database operation failed", 500 # return f"{e}",500 # finally: # connection.close() # # # Extract dynamic variable names from row 5 and detect "hold" columns # variables = {} # hold_columns = [] # hold_counter = 0 # # for j in range(1, sheet.max_column + 1): # col_value = sheet.cell(row=5, column=j).value # if col_value: # variables[col_value] = j # Store column name with its position # # # Check if the column header contains the word 'hold' # if 'hold' in str(col_value).lower(): # hold_counter += 1 # # Lookup hold type id from database # hold_type_key = str(col_value).lower().strip() # hold_type_id = hold_types_lookup.get(hold_type_key, None) # hold_columns.append({ # 'column_name': col_value, # 'column_number': j, # 'hold_type_id': hold_type_id # }) # # # Extract data dynamically based on row numbers # for i in range(6, sheet.max_row + 1): # row_data = {} # if sheet.cell(row=i, column=1).value: # row_data["Row Number"] = i # Store row number # for var_name, col_num in variables.items(): # row_data[var_name] = sheet.cell(row=i, column=col_num).value # # Check if at least 4 non-empty cells exist in the row # if sum(1 for value in row_data.values() if value) >= 4: # data.append(row_data) # # # For debugging or console output, you can print the hold columns info # for hold in hold_columns: # if hold['hold_type_id']: # print( # f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") # else: # errors.append( # f"Hold Type not added ! Column name '{hold['column_name']}'.") # print( # f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") # # return render_template( # 'show_excel_file.html', # file_info=file_info, # variables=variables, # data=data, # subcontractor_data=subcontractor_data, # state_data=state_data, # district_data=district_data, # block_data=block_data, # errors=errors, # hold_columns=hold_columns, # hold_counter=hold_counter # ) # save Excel data @app.route('/save_data', methods=['POST']) def save_data(): # Extract form data subcontractor_id = request.form.get("subcontractor_data") state_id = request.form.get("state_data") district_id = request.form.get("district_data") block_id = request.form.get("block_data") variables = request.form.getlist('variables[]') hold_columns = request.form.get("hold_columns") hold_counter = request.form.get("hold_counter") # print("Info: ", subcontractor_id, state_id, district_id, block_id) if not data: return jsonify({"error": "No data provided to save"}), 400 if data: # print("Total number of entries in data:", len(data)) connection = config.get_db_connection() cursor = connection.cursor() try: for entry in data: save_data = { "PMC_No": entry.get("PMC_No"), "Invoice_Details": entry.get("Invoice_Details", ''), "Work_Type": 'none', "Invoice_Date": entry.get("Invoice_Date").strftime('%Y-%m-%d') if entry.get( "Invoice_Date") else None, "Invoice_No": entry.get("Invoice_No", ''), "Basic_Amount": entry.get("Basic_Amount", 0.00), "Debit_Amount": entry.get("Debit_Amount", 0.00), "After_Debit_Amount": entry.get("After_Debit_Amount", 0.00), "Amount": entry.get("Amount", 0.00), "GST_Amount": entry.get("GST_Amount", 0.00), "TDS_Amount": entry.get("TDS_Amount", 0.00), "SD_Amount": entry.get("SD_Amount", 0.00), "On_Commission": entry.get("On_Commission", 0.00), "Hydro_Testing": entry.get("Hydro_Testing", 0.00), "Hold_Amount": 0, "GST_SD_Amount": entry.get("GST_SD_Amount", 0.00), "Final_Amount": entry.get("Final_Amount", 0.00), "Payment_Amount": entry.get("Payment_Amount", 0.00), "Total_Amount": entry.get("Total_Amount", 0.00), "TDS_Payment_Amount": entry.get("TDS_Payment_Amount", 0.00), "UTR": entry.get("UTR", ''), } village_name, work_type = None, None village_id = 0 LogHelper.log_action("Data saved", f"User {current_user.id} Data saved'{ village_name}'") PMC_No = save_data.get('PMC_No') Invoice_Details = save_data.get('Invoice_Details') Invoice_Date = save_data.get('Invoice_Date') Invoice_No = save_data.get('Invoice_No') Basic_Amount = save_data.get('Basic_Amount') Debit_Amount = save_data.get('Debit_Amount') After_Debit_Amount = save_data.get('After_Debit_Amount') Amount = save_data.get('Amount') GST_Amount = save_data.get('GST_Amount') TDS_Amount = save_data.get('TDS_Amount') SD_Amount = save_data.get('SD_Amount') On_Commission = save_data.get('On_Commission') Hydro_Testing = save_data.get('Hydro_Testing') GST_SD_Amount = save_data.get('GST_SD_Amount') Final_Amount = save_data.get('Final_Amount') Payment_Amount = save_data.get('Payment_Amount') Total_Amount = save_data.get('Total_Amount') TDS_Payment_Amount = save_data.get('TDS_Payment_Amount') UTR = save_data.get('UTR') if Invoice_Details: words = Invoice_Details.lower().split() if 'village' in words: village_pos = words.index('village') village_name = " ".join(words[:village_pos]) if 'work' in words: work_pos = words.index('work') if village_name: work_type = " ".join(words[village_pos + 1:work_pos + 1]) else: work_type = " ".join(words[:work_pos + 1]) if Invoice_Details and 'village' in Invoice_Details.lower() and 'work' in Invoice_Details.lower(): print("village_name ::", village_name, "|| work_type ::", work_type) if block_id and village_name: village_id = None # cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name)) # result = cursor.fetchone() cursor.callproc("GetVillageId", (block_id, village_name)) for result in cursor.stored_results(): result = result.fetchone() village_id = result[0] if result else None if not village_id: # cursor.execute("INSERT INTO villages (Village_Name, Block_Id) VALUES (%s, %s)", (village_name, block_id)) cursor.callproc("SaveVillage", (village_name, block_id)) # cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name)) # result = cursor.fetchone() cursor.callproc("GetVillageId", (block_id, village_name)) for result in cursor.stored_results(): result = result.fetchone() village_id = result[0] if result else None print("village_id :", village_id) print("block_id :", block_id) print("invoice :", PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No, Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount) # # cursor.execute("SET @p_invoice_id = 0") # cursor.callproc("SaveInvoice", ( # PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No, # Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, # SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount, # subcontractor_id, "@p_invoice_id" # )) # cursor.execute("SELECT @p_invoice_id") # invoice_id = cursor.fetchone()[0] args = ( PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No, Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount, subcontractor_id, 0 ) # for result in cursor.stored_results(): # invoice_id = result.fetchone()['invoice_id'] print("All invoice Details ",args) results = cursor.callproc('SaveInvoice', args) # cursor.callproc("SaveInvoice",args) # for re in cursor.stored_results(): invoice_id = results[-1] print("invoice id from the excel ", invoice_id) if isinstance(hold_columns, str): hold_columns = ast.literal_eval(hold_columns) # Check if hold_columns is actually a list of dictionaries if isinstance(hold_columns, list) and all(isinstance(hold, dict) for hold in hold_columns): for hold in hold_columns: print(f"Processing hold: {hold}") hold_column_name = hold.get('column_name') # Get column name hold_type_id = hold.get('hold_type_id') # Get hold_type_id if hold_column_name: hold_amount = entry.get( hold_column_name) # Get the value for that specific hold column if hold_amount is not None: print(f"Processing hold type: {hold_column_name}, Hold Amount: {hold_amount}") # Insert into the invoice_subcontractor_hold_join table hold_join_data = { "Contractor_Id": subcontractor_id, "Invoice_Id": invoice_id, "hold_type_id": hold_type_id, "hold_amount": hold_amount } # insert_hold_query = """INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount) # VALUES (%(Contractor_Id)s, %(Invoice_Id)s, %(hold_type_id)s, %(hold_amount)s); # """ # cursor.execute(insert_hold_query, hold_join_data) # print(f"Inserted hold join data: {hold_join_data}") cursor.callproc('InsertHoldJoinData', [ hold_join_data['Contractor_Id'], hold_join_data['Invoice_Id'], hold_join_data['hold_type_id'], hold_join_data['hold_amount'] ]) connection.commit() print(f"Inserted hold join data: {hold_join_data}") else: print(f"Invalid hold entry: {hold}") else: print("Hold columns data is not a valid list of dictionaries.") #---------------------------------------------Credit Note--------------------------------------------------------------------------- elif any(keyword in Invoice_Details.lower() for keyword in ['credit note','logging report']): print("Credit note found:", PMC_No, Invoice_No, Basic_Amount, Debit_Amount, Final_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No) cursor.execute( """INSERT INTO credit_note (PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Contractor_Id, invoice_no) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s)""", ( PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, subcontractor_id, Invoice_No)) #-----------------------------------------------Hold Amount---------------------------------------------------------------------- # Step 1: Normalize Invoice_Details: lowercase, trim, remove extra spaces normalized_details = re.sub(r'\s+', ' ', Invoice_Details.strip()).lower() # Step 2: Define lowercase keywords keywords = [ 'excess hold', 'ht', 'hold release amount', 'dpr excess hold amount', 'excess hold amount', 'Multi to Single layer bill', 'hold amount', 'logging report' ] # Step 3: Matching condition if any(kw in normalized_details for kw in keywords): print("✅ Match found. Inserting hold release for:", Invoice_Details) cursor.execute(""" INSERT INTO hold_release (PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Total_Amount, UTR, Contractor_Id) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Final_Amount, UTR, subcontractor_id )) connection.commit() # ✅ Ensure changes are saved to DB print("✅ Hold release inserted for:", PMC_No, Invoice_Details) #------------------------------------------------------------------------------------------------------------------ elif Invoice_Details and any( keyword in Invoice_Details.lower() for keyword in ['gst', 'release', 'note']): print("Gst rels :", PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, subcontractor_id) #cursor.callproc("SaveGSTRelease", (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR)) cursor.execute( """INSERT INTO gst_release (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, Contractor_Id) VALUES (%s,%s, %s, %s, %s, %s, %s)""", (PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR, subcontractor_id)) # insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)""" # cursor.execute(insert_payment, # (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR)) if PMC_No and Total_Amount and UTR: print("Payment :", PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR ) # insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)""" # cursor.execute(insert_payment, # (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR)) cursor.callproc("SavePayment", (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR )) connection.commit() return jsonify({"success": "Data saved successfully!"}), 200 # return render_template('uploadExcelFile.html') except Exception as e: connection.rollback() return jsonify({"error": f"An unexpected error occurred: {e}"}), 500 finally: cursor.close() connection.close() return render_template('index.html') # ---------------------- Report -------------------------------- # call report page @app.route('/report') def report_page(): return render_template('report.html') # Search list multiples input and search reports @app.route('/search_contractor', methods=['POST']) def search_contractor(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) subcontractor_name = request.form.get('subcontractor_name') pmc_no = request.form.get('pmc_no') state = request.form.get('state') district = request.form.get('district') block = request.form.get('block') village = request.form.get('village') year_from = request.form.get('year_from') year_to = request.form.get('year_to') conditions = [] params = [] LogHelper.log_action("Search contractor", f"User {current_user.id} Search contractor'{ subcontractor_name}'") if subcontractor_name: conditions.append("LOWER(s.Contractor_Name) LIKE LOWER(%s)") params.append(f"%{subcontractor_name}%") if pmc_no: conditions.append("i.PMC_No = %s") params.append(pmc_no) if state: conditions.append("LOWER(st.State_Name) LIKE LOWER(%s)") params.append(f"%{state}%") if district: conditions.append("LOWER(d.District_Name) LIKE LOWER(%s)") params.append(f"%{district}%") if block: conditions.append("LOWER(b.Block_Name) LIKE LOWER(%s)") params.append(f"%{block}%") if village: conditions.append("LOWER(v.Village_Name) LIKE LOWER(%s)") params.append(f"%{village}%") if year_from and year_to: conditions.append("i.Invoice_Date BETWEEN %s AND %s") params.append(year_from) params.append(year_to) if not conditions: return jsonify({"error": "At least one field is required for search."}), 400 # query = f""" # SELECT DISTINCT s.Contractor_Id, s.Contractor_Name, i.PMC_No, st.State_Name, # d.District_Name, b.Block_Name, v.Village_Name # FROM subcontractors s # INNER JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id # INNER JOIN villages v ON asg.Village_Id = v.Village_Id # INNER JOIN invoice i ON i.Village_Id = asg.Village_Id AND i.PMC_No = asg.PMC_No # LEFT JOIN blocks b ON v.Block_Id = b.Block_Id # LEFT JOIN districts d ON b.District_id = d.District_id # LEFT JOIN states st ON d.State_Id = st.State_Id # WHERE {' AND '.join(conditions)} # ORDER BY s.Contractor_Name ASC, i.PMC_No ASC # """ # cursor.execute(query, tuple(params)) # data = cursor.fetchall() cursor.callproc("search_contractor_info", [ subcontractor_name or None, pmc_no or None, state or None, district or None, block or None, village or None, year_from or None, year_to or None ]) for result in cursor.stored_results(): data = result.fetchall() return jsonify(data) @app.route('/contractor_report/') def contractor_report(contractor_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) try: # Contractor details cursor.execute(""" SELECT DISTINCT s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name, s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address FROM subcontractors s LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id LEFT JOIN villages v ON asg.Village_Id = v.Village_Id LEFT JOIN blocks b ON v.Block_Id = b.Block_Id LEFT JOIN districts d ON b.District_id = d.District_id LEFT JOIN states st ON d.State_Id = st.State_Id WHERE s.Contractor_Id = %s """, (contractor_id,)) contInfo = cursor.fetchone() # Hold types cursor.execute(""" SELECT DISTINCT ht.hold_type_id, ht.hold_type FROM invoice_subcontractor_hold_join h JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id JOIN invoice i ON h.Invoice_Id = i.Invoice_Id WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_types = cursor.fetchall() # Invoices cursor.execute(""" SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, h.hold_amount, ht.hold_type FROM assign_subcontractors asg INNER JOIN villages v ON asg.Village_Id = v.Village_Id INNER JOIN invoice i ON i.Village_Id = v.Village_Id AND i.PMC_No = asg.PMC_No LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id AND h.Contractor_Id = asg.Contractor_Id LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE asg.Contractor_Id = %s ORDER BY i.PMC_No ASC """, (contractor_id,)) invoices = cursor.fetchall() # GST Release cursor.execute(""" SELECT gr.pmc_no, gr.invoice_no, gr.basic_amount, gr.final_amount FROM gst_release gr INNER JOIN ( SELECT DISTINCT i.PMC_No, i.Invoice_No FROM invoice i JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id WHERE a.Contractor_Id = %s ) x ON gr.pmc_no = x.PMC_No AND gr.invoice_no = x.Invoice_No ORDER BY gr.pmc_no ASC """, (contractor_id,)) gst_rel = cursor.fetchall() #Hold # Hold Release cursor.execute("SELECT * FROM hold_release WHERE Contractor_Id=%s", (contractor_id,)) hold_release = cursor.fetchall() print(hold_release) #Credit Note cursor.execute("select * from credit_note where Contractor_Id=%s",(contractor_id,)) credit_note=cursor.fetchall() print(credit_note) # Payments (include valid matches and payments with pmc_no but invoice_no is NULL) cursor.execute(""" SELECT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr FROM payment p WHERE EXISTS ( SELECT 1 FROM invoice i JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id WHERE a.Contractor_Id = %s AND i.PMC_No = p.pmc_no AND i.Invoice_No = p.invoice_no ) OR ( p.invoice_no IS NULL AND EXISTS ( SELECT 1 FROM assign_subcontractors a WHERE a.Contractor_Id = %s AND a.PMC_No = p.pmc_no ) ) ORDER BY p.pmc_no ASC """, (contractor_id, contractor_id)) payments = cursor.fetchall() # Totals total = { "sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)), "sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)), "sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)), "sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)), "sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)), "sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)), "sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)), "sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)), "sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)), "sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)), "sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)), "sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)), "sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)), "sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)), "sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)), "sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)), "sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments)) } current_date = datetime.now().strftime('%Y-%m-%d') except Exception as e: print(f"Error fetching contractor report: {e}") return "An error occurred while fetching contractor report", 500 finally: cursor.close() connection.close() return render_template('subcontractor_report.html', contInfo=contInfo, contractor_id=contractor_id, invoices=invoices, hold_types=hold_types, gst_rel=gst_rel, payments=payments, credit_note=credit_note, hold_release=hold_release, total=total, current_date=current_date) # # Download report by contractor id # # Download report by contractor id class FilePathData: downloadReportFolder = "static/download" @app.route('/download_report/') def download_report(contractor_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) output_folder = FilePathData.downloadReportFolder os.makedirs(output_folder, exist_ok=True) output_file = os.path.join(output_folder, f"Contractor_Report_{contractor_id}.xlsx") try: # ---------------- Contractor Info ---------------- contractor = ContractorInfo(contractor_id) contInfo = contractor.contInfo if not contractor.contInfo: return "No contractor found", 404 # ---------------- Hold Types ---------------- cursor.execute(""" SELECT DISTINCT ht.hold_type_id, ht.hold_type FROM invoice_subcontractor_hold_join h JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_types = cursor.fetchall() hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # ---------------- Invoices ---------------- cursor.execute(""" SELECT i.*, v.Village_Name FROM assign_subcontractors asg INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id LEFT JOIN villages v ON i.Village_Id = v.Village_Id WHERE asg.Contractor_Id = %s ORDER BY i.PMC_No, i.Invoice_No """, (contractor_id,)) invoices = cursor.fetchall() # Remove duplicate invoices invoice_ids_seen = set() unique_invoices = [] for inv in invoices: if inv["Invoice_Id"] not in invoice_ids_seen: invoice_ids_seen.add(inv["Invoice_Id"]) unique_invoices.append(inv) invoices = unique_invoices # ---------------- Hold Amounts ---------------- cursor.execute(""" SELECT h.Invoice_Id, h.hold_type_id, h.hold_amount FROM invoice_subcontractor_hold_join h WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_amounts = cursor.fetchall() hold_data = {} for h in hold_amounts: hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # ---------------- Payments ---------------- cursor.execute(""" SELECT DISTINCT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr FROM payment p INNER JOIN invoice i ON i.PMC_No = p.pmc_no AND i.invoice_no = p.invoice_no INNER JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id WHERE asg.Contractor_Id = %s """, (contractor_id,)) payments = cursor.fetchall() payments_map = {} for pay in payments: key = (str(pay['pmc_no']), str(pay['invoice_no'])) payments_map.setdefault(key, []).append(pay) # ---------------- Extra Payments (no invoice_no) ---------------- cursor.execute(""" SELECT pmc_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr FROM payment WHERE (invoice_no IS NULL OR invoice_no = '') AND Total_amount != 0 AND pmc_no IS NOT NULL """) extra_payments_raw = cursor.fetchall() extra_payments_map = {} for pay in extra_payments_raw: extra_payments_map.setdefault(str(pay['pmc_no']), []).append({ 'Payment_Amount': pay['Payment_Amount'], 'TDS_Payment_Amount': pay['TDS_Payment_Amount'], 'Total_amount': pay['Total_amount'], 'utr': pay['utr'] }) # ---------------- Credit Notes ---------------- cursor.execute(""" SELECT PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR FROM credit_note WHERE Contractor_Id = %s """, (contractor_id,)) credit_notes = cursor.fetchall() credit_note_map = {} for cn in credit_notes: key = (str(cn['PMC_No']), str(cn['Invoice_No'])) credit_note_map.setdefault(key, []).append(cn) # ---------------- GST Releases ---------------- cursor.execute(""" SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR FROM gst_release WHERE Contractor_Id = %s ORDER BY PMC_No, Invoice_No """, (contractor_id,)) gst_releases = cursor.fetchall() gst_release_map = {} for gr in gst_releases: key = (str(gr['PMC_No']), str(gr['Invoice_No'])) gst_release_map.setdefault(key, []).append(gr) # ---------------- Excel Workbook ---------------- workbook = openpyxl.Workbook() sheet = workbook.active sheet.title = "Contractor Report" # Contractor Info for field, value in contInfo.items(): sheet.append([field.replace("_", " "), value]) sheet.append([]) # Headers base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] hold_headers = [ht['hold_type'] for ht in hold_types] payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] all_headers = base_headers + hold_headers + payment_headers sheet.append(all_headers) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) cell.fill = PatternFill(start_color="ADD8E6", end_color="ADD8E6", fill_type="solid") # ---------------- Data Rows ---------------- processed_gst_releases = set() appended_credit_keys = set() previous_pmc_no = None for inv in invoices: pmc_no = str(inv["PMC_No"]) invoice_no = str(inv["invoice_no"]) key = (pmc_no, invoice_no) # Yellow separator if PMC_No changes if previous_pmc_no and pmc_no != previous_pmc_no: sheet.append([""] * len(all_headers)) yellow_fill = PatternFill(start_color="FFFF99", end_color="FFFF99", fill_type="solid") for cell in sheet[sheet.max_row]: cell.fill = yellow_fill previous_pmc_no = pmc_no # Invoice row row = [ pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] ] # Hold values invoice_holds = hold_data.get(inv["Invoice_Id"], {}) for ht_id in hold_type_map: row.append(invoice_holds.get(ht_id, "")) # Payment values payment = payments_map.get(key, [None])[0] row += [ inv["Final_Amount"], payment["Payment_Amount"] if payment else "", payment["TDS_Payment_Amount"] if payment else "", payment["Total_amount"] if payment else "", payment["utr"] if payment and payment.get("utr") else "" ] sheet.append(row) # ---------------- Extra Payments for this PMC ---------------- if pmc_no in extra_payments_map: for ep in extra_payments_map[pmc_no]: extra_row = [pmc_no] + [""] * (len(base_headers) - 1) extra_row += [""] * len(hold_headers) extra_row += [ "", ep["Payment_Amount"], ep["TDS_Payment_Amount"], ep["Total_amount"], ep.get("utr", "") ] sheet.append(extra_row) del extra_payments_map[pmc_no] # GST Releases if key in gst_release_map and key not in processed_gst_releases: for gr in gst_release_map[key]: gst_row = [ pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"], gr["Basic_Amount"], "", "", "", "", "", "", "", "", "" ] gst_row += ["" for _ in hold_headers] gst_row += [gr["Final_Amount"], "", "", gr["Total_Amount"], gr.get("UTR", "")] sheet.append(gst_row) processed_gst_releases.add(key) # Credit Notes if key in credit_note_map and key not in appended_credit_keys: for cn in credit_note_map[key]: cn_row = [ pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""), cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""), cn.get("After_Debit_Amount", ""), cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "", "", "" ] cn_row += ["" for _ in hold_headers] cn_row += [cn.get("Final_Amount", ""), "", "", cn.get("Total_Amount", ""), cn.get("UTR", "")] sheet.append(cn_row) appended_credit_keys.add(key) # ---------------- Totals ---------------- total_basic_amount = total_tds_amount = total_sd_amount = total_on_commission = 0 total_final_amount = total_total_amount = total_hold_amount = 0 start_row = 2 # skip headers for r in sheet.iter_rows(min_row=start_row, max_row=sheet.max_row, values_only=True): try: total_basic_amount += float(r[6] or 0) total_tds_amount += float(r[11] or 0) total_sd_amount += float(r[12] or 0) total_on_commission += float(r[13] or 0) total_final_amount += float(r[-5] or 0) total_total_amount += float(r[-2] or 0) total_hold_amount += sum(float(r[i] or 0) for i in range(len(base_headers), len(base_headers) + len(hold_headers))) except: continue totals_row = [ "Total", "", "", "", "", "", total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, total_on_commission, "", "" ] totals_row += [total_hold_amount for _ in hold_headers] totals_row += [total_final_amount, "", "", total_total_amount, ""] sheet.append([]) sheet.append(totals_row) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) # ---------------- Column Width ---------------- for col in sheet.columns: max_length = 0 col_letter = openpyxl.utils.get_column_letter(col[0].column) for cell in col: if cell.value: max_length = max(max_length, len(str(cell.value))) sheet.column_dimensions[col_letter].width = max_length + 2 workbook.save(output_file) workbook.close() finally: cursor.close() connection.close() return send_from_directory(output_folder, f"Contractor_Report_{contractor_id}.xlsx", as_attachment=True) @app.route('/pmc_report/') def pmc_report(pmc_no): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) try: # 1. Fetch PMC info using stored procedure # cursor.execute(""" # SELECT DISTINCT a.PMC_No, a.Village_Id, v.Village_Name, b.Block_Name, # d.District_Name, s.State_Name, sc.Contractor_Id, sc.Contractor_Name, # sc.Address, sc.Mobile_No, sc.PAN_No, sc.Email, sc.Gender, # sc.GST_Registration_Type, sc.GST_No # FROM assign_subcontractors a # INNER JOIN villages v ON a.Village_Id = v.Village_Id # INNER JOIN blocks b ON v.Block_Id = b.Block_Id # INNER JOIN districts d ON b.District_id = d.District_id # INNER JOIN states s ON d.State_Id = s.State_Id # INNER JOIN subcontractors sc ON a.Contractor_Id = sc.Contractor_Id # WHERE a.pmc_no = %s # """, (pmc_no,)) # pmc_info = cursor.fetchone() cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,)) pmc_info = next(cursor.stored_results()).fetchone() if not pmc_info: return "No PMC found with this number", 404 # 2. Fetch hold types using stored procedure # cursor.execute(""" # SELECT DISTINCT ht.hold_type_id, ht.hold_type # FROM invoice_subcontractor_hold_join h # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # JOIN invoice i ON h.Invoice_Id = i.Invoice_Id # JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No # WHERE a.PMC_No = %s AND a.Contractor_Id = %s # """, (pmc_no, pmc_info["Contractor_Id"])) # hold_types = cursor.fetchall() cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"])) hold_types = next(cursor.stored_results()).fetchall() hold_type_ids = [ht['hold_type_id'] for ht in hold_types] # 3. Initialize invoice data invoices = [] hold_amount_total = 0 # 4. Build invoice query if hold_type_ids: placeholders = ','.join(['%s'] * len(hold_type_ids)) query = f""" SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, h.hold_amount, ht.hold_type FROM invoice i LEFT JOIN villages v ON i.Village_Id = v.Village_Id LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE a.PMC_No = %s AND a.Contractor_Id = %s AND (ht.hold_type_id IS NULL OR ht.hold_type_id IN ({placeholders})) ORDER BY i.Invoice_Date, i.Invoice_No """ params = [pmc_no, pmc_info["Contractor_Id"]] + hold_type_ids else: query = """ SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount FROM invoice i LEFT JOIN villages v ON i.Village_Id = v.Village_Id LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No WHERE a.PMC_No = %s AND a.Contractor_Id = %s ORDER BY i.Invoice_Date, i.Invoice_No """ params = [pmc_no, pmc_info["Contractor_Id"]] cursor.execute(query, params) invoices = cursor.fetchall() if hold_type_ids: hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices) # 5. Totals from invoices total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices) # 6. GST release cursor.execute(""" SELECT pmc_no, invoice_no, basic_amount, final_amount FROM gst_release WHERE pmc_no = %s ORDER BY invoice_no ASC """, (pmc_no,)) gst_rel = cursor.fetchall() # gst_rel = cursor.fetchall() # cursor.callproc('GetGSTReleaseByPMC', [pmc_no]) # # # Fetch results # for result in cursor.stored_results(): # gst_rel = result.fetchall() total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel) total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel) # Hold Release Amount cursor.execute("""select * from hold_release where pmc_no=%s""", (pmc_no,)) hold_release = cursor.fetchall() print("All Hold Release ", hold_release) # Credit Note cursor.execute("select * from credit_note where pmc_no=%s", (pmc_no,)) credit_note = cursor.fetchall() print(credit_note) # 7. Payments cursor.execute(""" SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr FROM payment WHERE pmc_no = %s ORDER BY invoice_no ASC """, (pmc_no,)) payments = cursor.fetchall() # cursor.callproc('GetPaymentByPMC', [pmc_no]) # # for result in cursor.stored_results(): # payments = result.fetchall() total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments) total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments) # 8. Final totals dictionary totals = { "sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices), "sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices), "sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices), "sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices), "sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices), "sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices), "sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices), "sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices), "sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices), "sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices), "sum_invo_final_amt": total_invo_final, "sum_invo_hold_amt": hold_amount_total, "sum_gst_basic_amt": total_gst_basic, "sum_gst_final_amt": total_gst_final, "sum_pay_payment_amt": total_pay_amount, "sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments), "sum_pay_total_amt": total_pay_total } except Exception as e: print(f"Error fetching PMC report: {e}") return "An error occurred while fetching PMC report", 500 finally: cursor.close() connection.close() return render_template( 'pmc_report.html', info=pmc_info, invoices=invoices, hold_types=hold_types, gst_rel=gst_rel, payments=payments, credit_note=credit_note, hold_release=hold_release, total=totals ) # # Download report by PMC No # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # if not os.path.exists(output_folder): # os.makedirs(output_folder) # cursor = connection.cursor(dictionary=True) # try: # # # Fetch Contractor Details using PMC No # # cursor.execute(""" # # SELECT DISTINCT s.Contractor_Id, s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name, # # s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address # # FROM subcontractors s # # LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id # # LEFT JOIN villages v ON asg.Village_Id = v.Village_Id # # LEFT JOIN blocks b ON v.Block_Id = b.Block_Id # # LEFT JOIN districts d ON b.District_id = d.District_id # # LEFT JOIN states st ON d.State_Id = st.State_Id # # WHERE asg.PMC_No = %s # # """, (pmc_no,)) # # contractor_info = cursor.fetchone() # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # # Now fetch the result: # for result in cursor.stored_results(): # contractor_info = result.fetchone() # if not contractor_info: # return "No contractor found for this PMC No", 404 # # # Fetch distinct hold types present for the contractor # # cursor.execute(""" # # SELECT DISTINCT ht.hold_type_id, ht.hold_type # # FROM invoice_subcontractor_hold_join h # # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # # WHERE h.Contractor_Id = %s # # """, (contractor_info["Contractor_Id"],)) # # hold_types = cursor.fetchall() # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # for result in cursor.stored_results(): # hold_types = result.fetchall() # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # # # Fetch Invoices & GST Releases # # cursor.execute(""" # # SELECT DISTINCT i.Invoice_Id, i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, # # i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, # # i.After_Debit_Amount, i.GST_Amount, i.Amount, i.TDS_Amount, i.SD_Amount, # # i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, # # g.pmc_no AS gst_pmc_no, g.invoice_no AS gst_invoice_no, # # g.basic_amount AS gst_basic_amount, g.final_amount AS gst_final_amount # # FROM invoice i # # LEFT JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No # # LEFT JOIN villages v ON i.Village_Id = v.Village_Id # # LEFT JOIN gst_release g ON i.PMC_No = g.pmc_no AND i.Invoice_No = g.invoice_no # # WHERE asg.PMC_No = %s # # ORDER BY i.Invoice_Date, i.Invoice_No # # """, (pmc_no,)) # # invoices = cursor.fetchall() # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # for result in cursor.stored_results(): # invoices = result.fetchall() # print("pmc_report invoice data:",invoices) # # cursor.callproc('GetInvoicesAndGSTReleasesByPMC', [pmc_no]) # # for result in cursor.stored_results(): # # invoices = result.fetchall() # # # Fetch Hold Amounts separately # # cursor.execute(""" # # SELECT h.Invoice_Id, ht.hold_type_id, h.hold_amount # # FROM invoice_subcontractor_hold_join h # # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # # WHERE h.Contractor_Id = %s # # """, (contractor_info["Contractor_Id"],)) # # hold_amounts = cursor.fetchall() # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # for result in cursor.stored_results(): # hold_amounts = result.fetchall() # # Create a mapping of invoice_id to hold amounts by type # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # # Fetch all Payments for the PMC number # # cursor.execute(""" # # SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, UTR # # FROM payment # # WHERE pmc_no = %s # # ORDER BY invoice_no # # """, (pmc_no,)) # # all_payments = cursor.fetchall() # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # for result in cursor.stored_results(): # all_payments = result.fetchall() # # Organize payments by Invoice No (both regular and GST release notes) # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # key = pay['invoice_no'] # if key not in payments_map: # payments_map[key] = [] # payments_map[key].append(pay) # else: # extra_payments.append(pay) # # Create Excel workbook # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # Write Contractor Details # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""]) # sheet.append( # ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ", # "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address", # contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]]) # sheet.append([]) # # Table Headers - include all hold types as separate columns # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # hold_headers = [ht['hold_type'] for ht in hold_types] # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # sheet.append(base_headers + hold_headers + payment_headers) # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # Process invoices # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # Process invoice row with first payment (if exists) # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if len(payments) > 0 else None # # Base invoice data # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # Add hold amounts for each hold type # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # Add payment information # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # sheet.append(row) # if first_payment: # payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # Process GST release if exists (only if we have a matching GST record) # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # # Find the payment that matches this GST release # gst_payment = None # for payment in payments[1:]: # Skip first payment (already used for invoice) # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # # If no payment found in the invoice's payments, check all payments # if not gst_payment: # gst_payments = payments_map.get(inv["gst_invoice_no"], []) # if gst_payments: # gst_payment = gst_payments[0] # # GST release row # row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for GST release # row += ["" for _ in hold_headers] # # Add payment information # row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(row) # if gst_payment: # payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # Process remaining payments as extra payments # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", payment['invoice_no'], # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for extra payments # row += ["" for _ in hold_headers] # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # Process extra payments (null invoice_no) # for payment in extra_payments: # payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", "", # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for null invoice payments # row += ["" for _ in hold_headers] # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # Calculate totals # total_basic_amount = 0 # total_tds_amount = 0 # total_sd_amount = 0 # total_on_commission = 0 # total_hold_amount = 0 # total_final_amount = 0 # total_payment_amount = 0 # total_tds_payment_amount = 0 # total_total_paid = 0 # for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += float(row[6] or 0) # Basic_Amount # total_tds_amount += float(row[11] or 0) # TDS_Amount # total_sd_amount += float(row[12] or 0) # SD_Amount # total_on_commission += float(row[13] or 0) # On_Commission # total_final_amount += float(row[-5] or 0) # Final_Amount # total_payment_amount += float(row[-4] or 0) # Payment_Amount # total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment # total_total_paid += float(row[-2] or 0) # Total_Paid # # Sum of hold amounts # hold_start_col = len(base_headers) # hold_end_col = hold_start_col + len(hold_headers) # total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col)) # except (ValueError, IndexError, TypeError): # continue # # Append totals row # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "", # Empty GST SD Amount # ] # # Add hold totals # totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1) # # Add payment totals # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # UTR column remains empty # ] # sheet.append([]) # sheet.append(totals_row) # # Make totals row bold # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # Save Excel file # workbook.save(output_file) # workbook.close() # finally: # cursor.close() # connection.close() # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # # if not os.path.exists(output_folder): # os.makedirs(output_folder) # # cursor = connection.cursor(dictionary=True) # # try: # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # # for result in cursor.stored_results(): # contractor_info = result.fetchone() # # if not contractor_info: # return "No contractor found for this PMC No", 404 # # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # # for result in cursor.stored_results(): # hold_types = result.fetchall() # # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # # for result in cursor.stored_results(): # invoices = result.fetchall() # total_tds=Decimal('0.00') # final_amount=Decimal('0.00') # # total_hold_amount=Decimal('0.00') # for data in invoices: # total_tds=total_tds+data.get('TDS_Amount',Decimal('0.00')) # final_amount=final_amount+data.get('Final_Amount',Decimal('0.00')) # # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # # for result in cursor.stored_results(): # hold_amounts = result.fetchall() # # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # # for result in cursor.stored_results(): # all_payments = result.fetchall() # total_amount=Decimal('0.00') # for d in all_payments: # total_amount=total_amount+ d.get('Total_Amount',Decimal('0.00')) # total_amount_paid= final_amount- total_amount; # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # key = pay['invoice_no'] # if key not in payments_map: # payments_map[key] = [] # payments_map[key].append(pay) # else: # extra_payments.append(pay) # # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # # Write Contractor Details # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""]) # sheet.append( # ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ", # "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address", # contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]]) # sheet.append([]) # # # Table Headers - include all hold types as separate columns # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # # hold_headers = [ht['hold_type'] for ht in hold_types] # # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # # sheet.append(base_headers + hold_headers + payment_headers) # # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # # Process invoices # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # # Process invoice row with first payment (if exists) # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if len(payments) > 0 else None # # # Base invoice data # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # # Add hold amounts for each hold type # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # # Add payment information # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # # sheet.append(row) # # if first_payment: # payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # # Process GST release if exists (only if we have a matching GST record) # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # # # Find the payment that matches this GST release # gst_payment = None # for payment in payments[1:]: # Skip first payment (already used for invoice) # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # # # If no payment found in the invoice's payments, check all payments # if not gst_payment: # gst_payments = payments_map.get(inv["gst_invoice_no"], []) # if gst_payments: # gst_payment = gst_payments[0] # # # GST release row (this will be in the same row, after the invoice information) # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for GST release # gst_row += ["" for _ in hold_headers] # # # Add GST payment information (same columns as invoice payment information) # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # # sheet.append(gst_row) # # if gst_payment: # payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # # Process remaining payments as extra payments (if any) # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", payment['invoice_no'], # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for extra payments # row += ["" for _ in hold_headers] # # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # # sheet.append(row) # processed_payments.add(payment_id) # # # Process extra payments (null invoice_no) # for payment in extra_payments: # payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", "", # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for null invoice payments # row += ["" for _ in hold_headers] # # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # # sheet.append(row) # processed_payments.add(payment_id) # # # Calculate totals # total_basic_amount = 0 # total_tds_amount = 0 # total_sd_amount = 0 # total_on_commission = 0 # total_hold_amount = 0 # total_final_amount = 0 # total_payment_amount = 0 # total_tds_payment_amount = 0 # total_total_paid = 0 # # for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += float(row[6] or 0) # Basic_Amount # total_tds_amount += float(row[11] or 0) # TDS_Amount # total_sd_amount += float(row[12] or 0) # SD_Amount # total_on_commission += float(row[13] or 0) # On_Commission # total_final_amount += float(row[-5] or 0) # Final_Amount # total_payment_amount += float(row[-4] or 0) # Payment_Amount # total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment # total_total_paid += float(row[-2] or 0) # Total_Paid # # # Sum of hold amounts # hold_start_col = len(base_headers) # hold_end_col = hold_start_col + len(hold_headers) # total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col)) # except (ValueError, IndexError, TypeError): # continue # # # Append totals row # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "", # Empty GST SD Amount # ] # if hold_headers: # totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1) # # # Add payment totals # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # UTR column remains empty # ] # # sheet.append([]) # sheet.append(totals_row) # #new code added for small chart---summary # total_hold_amount=Decimal('0.00') # for d in invoices: # total_hold_amount = total_hold_amount + d.get('SD_Amount', Decimal('0.00')) + d.get('On_Commission', # Decimal( # '0.00')) + d.get( # 'Hydro_Testing', Decimal('0.00')) # for data in hold_amounts: # total_hold_amount = total_hold_amount + data.get('hold_amount', Decimal('0.00')) # print("Total Hold Amount after adding the hold amount ", total_hold_amount) # # # Add payment information # # Get today's date # today_date = datetime.today().strftime('%A,%Y-%m-%d') # # Add headers (optional) # sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) # sheet.append(["Date", today_date]) # sheet.append(["Description", "Amount"]) # # Add your values # sheet.append(["Advance/Surplus", str(total_final_amount-total_payment_amount)]) # sheet.append(["Total Hold Amount", str(total_hold_amount)]) # sheet.append(["Amount With TDS", str(total_tds_payment_amount)]) # # new coded ended here for summary chart # # Make totals row bold # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # # Save Excel file # workbook.save(output_file) # workbook.close() # # finally: # cursor.close() # connection.close() # # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # # if not os.path.exists(output_folder): # os.makedirs(output_folder) # # cursor = connection.cursor(dictionary=True) # # try: # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # contractor_info = next(cursor.stored_results()).fetchone() # # if not contractor_info: # return "No contractor found for this PMC No", 404 # # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # hold_types = next(cursor.stored_results()).fetchall() # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # invoices = next(cursor.stored_results()).fetchall() # # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # hold_amounts = next(cursor.stored_results()).fetchall() # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # all_payments = next(cursor.stored_results()).fetchall() # # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # payments_map.setdefault(pay['invoice_no'], []).append(pay) # else: # extra_payments.append(pay) # # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # # Write contractor header # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."]) # sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]]) # sheet.append([]) # # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # # hold_headers = [ht['hold_type'] for ht in hold_types] # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # sheet.append(base_headers + hold_headers + payment_headers) # # Style the headers # header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid") # header_font=Font(bold=True) # for cell in sheet[sheet.max_row]: # cell.font=header_font # cell.fill=header_fill # # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if payments else None # # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # # sheet.append(row) # # if first_payment: # processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}") # # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # gst_payment = None # for payment in payments[1:]: # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # if not gst_payment: # gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0] # # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # ] # gst_row += ["" for _ in hold_headers] # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(gst_row) # if gst_payment: # processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}") # # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10 # row += ["" for _ in hold_headers] # row += [ # "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], # payment["Total_amount"], payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # for payment in extra_payments: # row = [pmc_no, "", "", "", "", ""] + [""] * 10 # row += ["" for _ in hold_headers] # row += [ # "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], # payment["Total_amount"], payment["UTR"] # ] # sheet.append(row) # # # Totals # total_basic_amount = Decimal('0.00') # total_tds_amount = Decimal('0.00') # total_sd_amount = Decimal('0.00') # total_on_commission = Decimal('0.00') # total_final_amount = Decimal('0.00') # total_payment_amount = Decimal('0.00') # total_tds_payment_amount = Decimal('0.00') # total_total_paid = Decimal('0.00') # total_hold_amount_dynamic = Decimal('0.00') # # for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += Decimal(str(row[6] or 0)) # total_tds_amount += Decimal(str(row[11] or 0)) # total_sd_amount += Decimal(str(row[12] or 0)) # total_on_commission += Decimal(str(row[13] or 0)) # total_final_amount += Decimal(str(row[-5] or 0)) # total_payment_amount += Decimal(str(row[-4] or 0)) # total_tds_payment_amount += Decimal(str(row[-3] or 0)) # total_total_paid += Decimal(str(row[-2] or 0)) # # for i in range(len(base_headers), len(base_headers) + len(hold_headers)): # total_hold_amount_dynamic += Decimal(str(row[i] or 0)) # except: # continue # # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "" # ] # totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1) # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # ] # # sheet.append([]) # sheet.append(totals_row) # # # Summary # summary_hold = Decimal('0.00') # for d in invoices: # summary_hold += Decimal(str(d.get('SD_Amount', 0.00))) + Decimal(str(d.get('On_Commission', 0.00))) + Decimal(str(d.get('Hydro_Testing', 0.00))) # for h in hold_amounts: # summary_hold += Decimal(str(h.get('hold_amount', 0.00))) # # sheet.append([]) # today = datetime.today().strftime('%A, %Y-%m-%d') # sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) # sheet.append(["Date", today]) # sheet.append(["Description", "Amount"]) # sheet.append(["Advance/Surplus", str(total_final_amount - total_payment_amount)]) # sheet.append(["Total Hold Amount", str(summary_hold)]) # sheet.append(["Amount With TDS", str(total_payment_amount + total_tds_payment_amount)]) # # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # workbook.save(output_file) # workbook.close() # # finally: # cursor.close() # connection.close() # # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) @app.route('/download_pmc_report/') def download_pmc_report(pmc_no): connection = config.get_db_connection() output_folder = "static/download" output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") if not os.path.exists(output_folder): os.makedirs(output_folder) cursor = connection.cursor(dictionary=True) try: cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) contractor_info = next(cursor.stored_results()).fetchone() if not contractor_info: return "No contractor found for this PMC No", 404 cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) hold_types = next(cursor.stored_results()).fetchall() hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) invoices = next(cursor.stored_results()).fetchall() # Credit Note # Credit Note Fetch cursor.execute(""" SELECT PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No FROM credit_note WHERE Contractor_Id = %s """, (pmc_no,)) credit_notes = cursor.fetchall() # Build map by (PMC_No, Invoice_No) credit_note_map = {} for cn in credit_notes: key = (cn["PMC_No"], cn["Invoice_No"]) # Use correct casing! credit_note_map.setdefault(key, []).append(cn) # Track already appended credit notes appended_credit_keys = set() cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) hold_amounts = next(cursor.stored_results()).fetchall() hold_data = {} for h in hold_amounts: hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) all_payments = next(cursor.stored_results()).fetchall() payments_map = {} extra_payments = [] for pay in all_payments: if pay['invoice_no']: payments_map.setdefault(pay['invoice_no'], []).append(pay) else: extra_payments.append(pay) # Fetch GST Release data cursor.execute(""" SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR FROM gst_release WHERE PMC_No = %s """, (pmc_no,)) gst_releases = cursor.fetchall() gst_release_map = {} for gr in gst_releases: invoice_nos = [] if gr['Invoice_No']: cleaned = gr['Invoice_No'].replace(' ', '') if '&' in cleaned: invoice_nos = cleaned.split('&') elif ',' in cleaned: invoice_nos = cleaned.split(',') else: invoice_nos = [cleaned] for inv_no in invoice_nos: gst_release_map.setdefault(inv_no, []).append(gr) LogHelper.log_action("Download PMC Report", f"User {current_user.id} Download PMC Report'{ pmc_no}'") workbook = openpyxl.Workbook() sheet = workbook.active sheet.title = "PMC Report" # Write contractor header sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."]) sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]]) sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]]) sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]]) sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]]) sheet.append([]) base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] hold_headers = [ht['hold_type'] for ht in hold_types] payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] sheet.append(base_headers + hold_headers + payment_headers) # Style the headers header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid") header_font=Font(bold=True) for cell in sheet[sheet.max_row]: cell.font=header_font cell.fill=header_fill seen_invoices = set() seen_gst_notes = set() processed_payments = set() for inv in invoices: invoice_no = inv["Invoice_No"] payments = payments_map.get(invoice_no, []) if invoice_no not in seen_invoices: seen_invoices.add(invoice_no) first_payment = payments[0] if payments else None row = [ pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] ] invoice_holds = hold_data.get(inv["Invoice_Id"], {}) for ht_id in hold_type_map.keys(): row.append(invoice_holds.get(ht_id, "")) row += [ inv["Final_Amount"], first_payment["Payment_Amount"] if first_payment else "", first_payment["TDS_Payment_Amount"] if first_payment else "", first_payment["Total_amount"] if first_payment else "", first_payment["UTR"] if first_payment else "" ] sheet.append(row) if first_payment: processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}") # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # gst_payment = None # for payment in payments[1:]: # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # if not gst_payment: # gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0] # # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # ] # gst_row += ["" for _ in hold_headers] # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(gst_row) # Add GST Release Note(s) for this invoice if any if invoice_no in gst_release_map: for gr in gst_release_map[invoice_no]: gst_row = [ pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"], gr["Basic_Amount"], "", "", "", "", "", "", "", "", "" ] gst_row += ["" for _ in hold_headers] gst_row += [ gr["Final_Amount"], "", "", gr["Total_Amount"], gr["UTR"] if gr["UTR"] else "" ] # ✅ Ensure proper alignment while len(gst_row) < len(base_headers) + len(hold_headers) + len(payment_headers): gst_row.append("") sheet.append(gst_row) # if gst_payment: # processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}") for payment in payments[1:]: payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" if payment_id not in processed_payments: row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10 row += ["" for _ in hold_headers] row += [ "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], payment["Total_amount"], payment["UTR"] ] sheet.append(row) processed_payments.add(payment_id) for payment in extra_payments: row = [pmc_no, "", "", "", "", ""] + [""] * 10 row += ["" for _ in hold_headers] row += [ "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], payment["Total_amount"], payment["UTR"] ] sheet.append(row) # Credit Note row(s) # Track already appended credit notes appended_credit_keys = set() # While writing invoices key = (pmc_no, invoice_no) if key in credit_note_map and key not in appended_credit_keys: for cn in credit_note_map[key]: credit_row = [ pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""), cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""), cn.get("After_Debit_Amount", ""), cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "", "", "" ] credit_row += ["" for _ in hold_headers] credit_row += [ cn.get("Final_Amount", ""), cn.get("Total_Amount", ""), cn.get("UTR", "") ] sheet.append(credit_row) appended_credit_keys.add(key) # Totals total_basic_amount = Decimal('0.00') total_tds_amount = Decimal('0.00') total_sd_amount = Decimal('0.00') total_on_commission = Decimal('0.00') total_final_amount = Decimal('0.00') total_payment_amount = Decimal('0.00') total_tds_payment_amount = Decimal('0.00') total_total_paid = Decimal('0.00') total_hold_amount_dynamic = Decimal('0.00') for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True): try: total_basic_amount += Decimal(str(row[6] or 0)) total_tds_amount += Decimal(str(row[11] or 0)) total_sd_amount += Decimal(str(row[12] or 0)) total_on_commission += Decimal(str(row[13] or 0)) total_final_amount += Decimal(str(row[-5] or 0)) total_payment_amount += Decimal(str(row[-4] or 0)) total_tds_payment_amount += Decimal(str(row[-3] or 0)) total_total_paid += Decimal(str(row[-2] or 0)) for i in range(len(base_headers), len(base_headers) + len(hold_headers)): total_hold_amount_dynamic += Decimal(str(row[i] or 0)) except: continue # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "" # ] # if total_hold_amount_dynamic: # totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1) # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # ] # Prepare empty totals_row with length of base_headers totals_row = [""] * len(base_headers) # Fill in specific columns totals_row[0] = "TOTAL" # Column 0: Label totals_row[6] = total_basic_amount # Column 6: Basic Amount totals_row[11] = total_tds_amount # Column 11: TDS totals_row[12] = total_sd_amount # Column 12: SD totals_row[13] = total_on_commission # Column 13: On Commission # Add hold header totals hold_values = ["" for _ in hold_headers] if total_hold_amount_dynamic: hold_values[0] = total_hold_amount_dynamic # Only in first column totals_row += hold_values # Add payment section totals_row += [ total_final_amount, total_payment_amount, total_tds_payment_amount, total_total_paid, "" # UTR ] sheet.append([]) sheet.append(totals_row) # Summary # summary_hold = Decimal('0.00') # for d in invoices: # summary_hold += Decimal(str(d.get('SD_Amount', 0))) + Decimal(str(d.get('On_Commission', 0))) + Decimal(str(d.get('Hydro_Testing', 0))) # for h in hold_amounts: # summary_hold += Decimal(str(h.get('hold_amount', 0))) sheet.append([]) today = datetime.today().strftime('%A, %Y-%m-%d') sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) sheet.append(["Date", today]) sheet.append(["Description", "Amount"]) sheet.append(["Advance/Surplus", str(total_final_amount - total_total_paid)]) sheet.append(["Total Hold Amount", str(total_hold_amount_dynamic)]) sheet.append(["Amount With TDS", str(total_tds_amount)]) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) workbook.save(output_file) workbook.close() finally: cursor.close() connection.close() return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # --------- Hold Types Controller -------------------------------------------- # Route to Add a New Hold Type @app.route('/add_hold_type', methods=['POST', 'GET']) def add_hold_type(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) try: # Fetch all hold types using the stored procedure cursor.callproc("GetAllHoldTypes") hold_types = [] for hold in cursor.stored_results(): hold_types = hold.fetchall() if request.method == 'POST': hold_type = request.form.get('hold_type', '').strip() # Validation: Must start with a letter if not hold_type or not hold_type[0].isalpha(): return jsonify({"status": "error", "message": "Hold Type must start with a letter."}), 400 # Validation: Check if it already exists (case-insensitive) # cursor.execute("SELECT COUNT(*) AS count FROM hold_types WHERE LOWER(hold_type) = LOWER(%s)", (hold_type,)) # if cursor.fetchone()['count'] > 0: # return jsonify({"status": "error", "message": "This Hold Type already exists."}), 400 # Call the procedure to check if the hold_type exists cursor.callproc('CheckHoldTypeExists', [hold_type]) try: # Insert new hold type into the database # cursor.execute("INSERT INTO hold_types (hold_type) VALUES (%s)", (hold_type,)) # connection.commit() cursor.callproc('SaveHoldType', [hold_type]) connection.commit() return jsonify({"status": "success", "message": "Hold Type added successfully!"}), 201 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 except mysql.connector.Error as e: return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 finally: cursor.close() connection.close() return render_template('add_hold_type.html', Hold_Types_data=hold_types) # Route to Update Hold Type # @app.route('/update_hold_type/', methods=['POST', 'GET']) # def update_hold_type(id): # # GET request: Show the form with the current hold type # if request.method == 'GET': # connection = config.get_db_connection() # cursor = connection.cursor() # # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # # hold_type = cursor.fetchone() # # cursor.callproc("GetHoldTypesById", (id,)) # for hold in cursor.stored_results(): # hold_type = hold.fetchone() # # cursor.close() # connection.close() # # if not hold_type: # return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # # return render_template('edit_hold_type.html', hold_type=hold_type) # # # POST request: Update the hold type # if request.method == 'POST': # new_hold_type = request.form.get('hold_type').strip() # # # Validation: Must start with a letter # if not new_hold_type or not new_hold_type[0].isalpha(): # return jsonify(ResponseHandler.invalid_name('Hold Type')), 400 # # connection = config.get_db_connection() # cursor = connection.cursor() # # try: # # Check if the hold type exists before updating # # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # # hold_type = cursor.fetchone() # cursor.callproc("GetHoldTypesById", (id,)) # for hold in cursor.stored_results(): # hold_type = hold.fetchone() # # if not hold_type: # return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # # # Update the hold type # # cursor.execute("UPDATE hold_types SET hold_type = %s WHERE hold_type_id = %s", (new_hold_type, id)) # cursor.callproc("UpdateHoldTypeById", (id,new_hold_type)) # connection.commit() # return jsonify(ResponseHandler.update_success('Hold Type')) # # except mysql.connector.Error as e: # connection.rollback() # return jsonify(ResponseHandler.update_failure('Hold Type')), 500 # finally: # cursor.close() # connection.close() @app.route('/update_hold_type/', methods=['GET', 'POST']) def update_hold_type(id): connection = config.get_db_connection() cursor = connection.cursor() if request.method == 'GET': cursor.callproc("GetHoldTypesById", (id,)) for hold in cursor.stored_results(): hold_type = hold.fetchone() cursor.close() connection.close() if not hold_type: flash('Hold Type not found.', 'error') return redirect(url_for('add_hold_type')) return render_template('edit_hold_type.html', hold_type=hold_type) elif request.method == 'POST': new_hold_type = request.form.get('hold_type', '').strip() if not new_hold_type or not new_hold_type[0].isalpha(): flash('Invalid hold type name. Must start with a letter.', 'error') return redirect(url_for('add_hold_type')) try: cursor.callproc("GetHoldTypesById", (id,)) for h in cursor.stored_results(): hold_type = h.fetchone() if not hold_type: flash('Hold Type not found.', 'error') return redirect(url_for('add_hold_type')) cursor.callproc("UpdateHoldTypeById", (id, new_hold_type)) connection.commit() flash('Hold Type updated successfully!', 'success') except mysql.connector.Error as e: connection.rollback() flash('Failed to update Hold Type.', 'error') finally: cursor.close() connection.close() return redirect(url_for('add_hold_type')) # Route to Delete Hold Type @app.route('/delete_hold_type/', methods=['POST']) def delete_hold_type(id): connection = config.get_db_connection() cursor = connection.cursor() try: # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # hold_type = cursor.fetchone() cursor.callproc("GetHoldTypesById", (id,)) for hold in cursor.stored_results(): hold_type = hold.fetchone() LogHelper.log_action("Delete hold type", f"User {current_user.id} Delete hold type'{ hold_type}'") if not hold_type: return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # Proceed with deletion # cursor.execute("DELETE FROM hold_types WHERE hold_type_id = %s", (id,)) cursor.callproc("DeleteHoldType", (id,)) connection.commit() return jsonify(ResponseHandler.delete_success('Hold Type')) except mysql.connector.Error as e: return jsonify(ResponseHandler.delete_failure('Hold Type')), 500 finally: cursor.close() connection.close() @app.route('/unreleased_gst') def unreleased_gst(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) try: # Step 1: Fetch invoices cursor.execute(""" SELECT i.PMC_No, i.Invoice_No, i.GST_SD_Amount, i.Invoice_Details, i.Basic_Amount, i.Final_Amount FROM `invoice` i """) invoices = cursor.fetchall() # Step 2: Fetch GST releases cursor.execute(""" SELECT Invoice_No, Basic_Amount ,Final_Amount FROM gst_release """) gst_releases = cursor.fetchall() # Step 3: Lookup sets gst_invoice_nos = {g['Invoice_No'] for g in gst_releases if g['Invoice_No']} gst_basic_amounts = {float(g['Basic_Amount']) for g in gst_releases if g['Basic_Amount'] is not None} # Step 4: Filter unreleased = [] for inv in invoices: match_by_invoice = inv['Invoice_No'] in gst_invoice_nos match_by_gst_amount = float(inv.get('GST_SD_Amount') or 0.0) in gst_basic_amounts if not (match_by_invoice or match_by_gst_amount): unreleased.append(inv) return render_template("unreleased_gst.html", data=unreleased) finally: cursor.close() connection.close() # -- end hold types controlller -------------------- # -- end hold types controlller -------------------- # Route to display the HTML form @app.route('/add_work_order', methods=['GET']) def add_work_order(): # Add database connection connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed subcontractor = cursor.fetchall() cursor.close() connection.close() return render_template('add_work_order.html', subcontractor=subcontractor) # This is your HTML form page # Route to handle form submission (from action="/submit_work_order") @app.route('/submit_work_order', methods=['POST', 'GET']) def submit_work_order(): vendor_name = request.form['vendor_name'] work_order_type = request.form['work_order_type'] work_order_amount = request.form['work_order_amount'] boq_amount = request.form['boq_amount'] work_done_percentage = request.form['work_done_percentage'] work_order_number = request.form['work_order_number'] gst_amount = request.form['gst_amount'] tds_amount = request.form['tds_amount'] security_deposite = request.form['security_deposite'] sd_against_gst = request.form['sd_against_gst'] final_total = request.form['final_total'] tds_of_gst = request.form['tds_of_gst'] LogHelper.log_action("Submit Work Order", f"User {current_user.id} Submit Work Order'{ work_order_type}'") # print("Good Morning How are U") connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) # print("Good morning and how are you") insert_query = """ INSERT INTO work_order (vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage,work_order_number,gst_amount,tds_amount ,security_deposit,sd_against_gst,final_total,tds_of_gst) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s) """ cursor.execute(insert_query, (vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number , gst_amount, tds_amount, security_deposite, sd_against_gst, final_total, tds_of_gst)) connection.commit() # ✅ Fetch all data after insert select_query = "SELECT * FROM work_order" cursor.execute(select_query) wo = cursor.fetchall() # print("The Work order data is ",wo) print("The data from work order ", wo) # should now print the data properly cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed subcontractor = cursor.fetchall() cursor.close() connection.close() return render_template('add_work_order.html', work_order_type=work_order_type, wo=wo, subcontractor=subcontractor) @app.route('/delete_work_order/', methods=['POST']) def delete_work_order(id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM work_order WHERE work_order_id = %s", (id,)) connection.commit() cursor.close() connection.close() LogHelper.log_action("delete Work Order", f"User {current_user.id} delete Work Order'{ id}'") return jsonify({'success': True}) @app.route('/update_work_order/', methods=['GET', 'POST']) def update_work_order(id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': work_order_type = request.form['work_order_type'] work_order_amount = request.form['work_order_amount'] boq_amount = request.form['boq_amount'] work_done_percentage = request.form['work_done_percentage'] work_order_number = request.form['work_order_number'] gst_amount = request.form['gst_amount'] tds_amount = request.form['tds_amount'] security_deposite = request.form['security_deposite'] sd_against_gst = request.form['sd_against_gst'] final_amount = request.form['final_amount'] tds_of_gst = request.form['tds_of_gst'] update_query = """ UPDATE work_order SET work_order_type = %s, work_order_amount = %s, boq_amount = %s, work_done_percentage = %s, work_order_number= %s, gst_amount = %s, tds_amount= %s, security_deposite= %s, sd_against_gst=%s, final_amount= %s, tds_of_gst=%s WHERE work_order_id = %s """ cursor.execute(update_query, ( work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number, gst_amount, tds_amount, security_deposite, sd_against_gst, final_amount, tds_of_gst, id)) connection.commit() cursor.close() connection.close() # If GET request: fetch the existing record cursor.execute("SELECT * FROM work_order WHERE work_order_id = %s", (id,)) work_order = cursor.fetchone() cursor.close() connection.close() return render_template('update_work_order.html', work_order=work_order) # Optional: Route to show a success message @app.route('/success') def success(): return "Work Order Submitted Successfully!" logging.basicConfig(level=logging.DEBUG) @app.route('/add_purchase_order', methods=['GET', 'POST']) def add_purchase_order(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': # Fetch form fields purchase_date = request.form.get('purchase_date') supplier_name = request.form.get('supplier_name') purchase_order_no = request.form.get('purchase_order_no') item_name = request.form.get('item_name') quantity = request.form.get('quantity') unit = request.form.get('unit') rate = request.form.get('rate') amount = request.form.get('amount') GST_Amount = request.form.get('GST_Amount') TDS = request.form.get('TDS') final_amount = request.form.get('final_amount') LogHelper.log_action("Add purchase order", f"User {current_user.id} Added puirchase Order'{ purchase_order_no}'") # Insert into database insert_query = """ INSERT INTO purchase_order ( purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount, GST_Amount, TDS, final_amount ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, ( purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount, GST_Amount, TDS, final_amount )) connection.commit() # ✅ Always fetch updated data cursor.execute("SELECT * FROM purchase_order") purchases = cursor.fetchall() cursor.close() connection.close() return render_template('add_purchase_order.html', purchases=purchases) # Show all purchases @app.route('/purchase_orders') def show_purchase_orders(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT * FROM purchase_order") purchases = cursor.fetchall() cursor.close() connection.close() return render_template('add_purchase_order.html', purchases=purchases) # Delete purchase order @app.route('/delete_purchase/', methods=['POST']) def delete_purchase(id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM purchase_order WHERE purchase_id = %s", (id,)) connection.commit() cursor.close() connection.close() LogHelper.log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'") return render_template(('add_purchase_order.html')) # Edit purchase order (form + update logic) @app.route('/update_purchase/', methods=['GET', 'POST']) def update_purchase(id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': # ✅ Form submitted - update all fields data = request.form cursor.execute(""" UPDATE purchase_order SET purchase_date = %s, supplier_name = %s, purchase_order_no = %s, item_name = %s, quantity = %s, unit = %s, rate = %s, amount = %s, GST_Amount = %s, TDS = %s, final_amount = %s WHERE purchase_id = %s """, ( data['purchase_date'], data['supplier_name'], data['purchase_order_no'], data['item_name'], data['quantity'], data['unit'], data['rate'], data['amount'], data['GST_Amount'], data['TDS'], data['final_amount'], id )) connection.commit() cursor.close() connection.close() LogHelper.log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'") return redirect(url_for('show_purchase_orders')) # Show edit form cursor.execute("SELECT * FROM purchase_order WHERE purchase_id = %s", (id,)) purchase = cursor.fetchone() cursor.close() connection.close() return render_template('edit_purchase.html', purchase=purchase) # SHOW all GRNs + ADD form @app.route('/grn', methods=['GET']) def grn_page(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) # Fetch purchase orders for dropdown cursor.execute("SELECT purchase_id, supplier_name FROM purchase_order") purchase_orders = cursor.fetchall() # Fetch all GRNs to display cursor.execute("SELECT * FROM goods_receive_note") grns = cursor.fetchall() print(grns) cursor.close() connection.close() # Render the template with both datasets return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns) # ADD new GRN @app.route('/add_grn', methods=['POST', 'GET']) def add_grn(): data = request.form connection = config.get_db_connection() cursor = connection.cursor() query = """ INSERT INTO goods_receive_note (grn_date, purchase_id, supplier_name, item_description, received_quantity, unit, rate, amount, remarks) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \ """ cursor.execute(query, ( data.get('grn_date'), data.get('purchase_id'), data.get('supplier_name'), data.get('item_description'), data.get('received_quantity'), data.get('unit'), data.get('rate'), data.get('amount'), data.get('remarks') )) connection.commit() cursor.execute("SELECT * FROM goods_receive_note") grns = cursor.fetchall() print(grns) query = "select * from purchase_order" cursor.execute(query) purchase_orders = cursor.fetchall() cursor.close() connection.close() return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns) # UPDATE GRN @app.route('/update_grn/', methods=['GET', 'POST']) def update_grn(grn_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': data = request.form query = """ UPDATE goods_receive_note SET grn_date=%s, purchase_id=%s, supplier_name=%s, item_description=%s, received_quantity=%s, unit=%s, rate=%s, amount=%s, remarks=%s WHERE grn_id=%s """ cursor.execute(query, ( data['grn_date'], data['purchase_id'], data['supplier_name'], data['item_description'], data['received_quantity'], data['unit'], data['rate'], data['amount'], data['remarks'], grn_id )) connection.commit() cursor.close() connection.close() return redirect(url_for('grns')) cursor.execute("SELECT * FROM goods_receive_note WHERE grn_id = %s", (grn_id,)) grn = cursor.fetchone() cursor.close() connection.close() return render_template("edit_grn.html", grn=grn) # DELETE GRN @app.route('/delete_grn/', methods=['POST']) def delete_grn(grn_id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM goods_receive_note WHERE grn_id = %s", (grn_id,)) connection.commit() cursor.close() connection.close() return render_template("grn_form.html") @app.route('/work_order_report', methods=['GET']) def work_order_report(): return render_template('work_order_report.html') # ✅ Vendor Name Search (for Select2) @app.route('/get_vendor_names') def get_vendor_names(): query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("SELECT DISTINCT vendor_name FROM work_order WHERE vendor_name LIKE %s", (f"%{query}%",)) vendors = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(vendors) # ✅ Work Order Number Search (with or without vendor) @app.route('/get_work_order_numbers') def get_work_order_numbers(): vendor = request.args.get('vendor_name', '') query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() if vendor: cursor.execute( "SELECT DISTINCT work_order_number FROM work_order WHERE vendor_name = %s AND work_order_number LIKE %s", (vendor, f"%{query}%")) else: cursor.execute("SELECT DISTINCT work_order_number FROM work_order WHERE work_order_number LIKE %s", (f"%{query}%",)) orders = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(orders) # ✅ Get Work Order Data (Filtered) @app.route('/get_work_order_data') def get_work_order_data(): vendor = request.args.get('vendor_name') order_number = request.args.get('work_order_number') query = "SELECT * FROM work_order WHERE 1=1" params = [] if vendor: query += " AND vendor_name = %s" params.append(vendor) if order_number: query += " AND work_order_number = %s" params.append(order_number) connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() connection.close() return jsonify(data) @app.route('/download_work_order_report') def download_work_order_report(): vendor_name = request.args.get('vendor_name') work_order_number = request.args.get('work_order_number') if work_order_number == "null": work_order_number = None query = "SELECT * FROM work_order WHERE 1=1" params = [] if vendor_name: query += " AND vendor_name = %s" params.append(vendor_name) if work_order_number: query += " AND work_order_number = %s" params.append(work_order_number) conn = config.get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() conn.close() if not data: return "No data found for the selected filters", 404 # Convert to DataFrame df = pd.DataFrame(data) output_path = 'static/downloads/work_order_report.xlsx' os.makedirs(os.path.dirname(output_path), exist_ok=True) df.to_excel(output_path, index=False, startrow=2) # Leave space for heading # Load workbook for styling wb = load_workbook(output_path) ws = wb.active # Add a merged title title = "Work Order Report" ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns)) title_cell = ws.cell(row=1, column=1) title_cell.value = title title_cell.font = Font(size=14, bold=True, color="1F4E78") title_cell.alignment = Alignment(horizontal="center", vertical="center") # Style header row (row 3 because data starts from row 3) header_font = Font(bold=True) for col_num, column_name in enumerate(df.columns, 1): cell = ws.cell(row=3, column=col_num) cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Optional: adjust column width max_length = max(len(str(column_name)), 12) ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2 wb.save(output_path) return send_file(output_path, as_attachment=True) @app.route('/purchase_order_report', methods=['GET']) def purchase_order_report(): return render_template('purchase_order_report.html') @app.route('/get_supplier_names') def get_supplier_names(): query = request.args.get('q', '') # Get the search term from Select2 connection = config.get_db_connection() cursor = connection.cursor() # Fetch distinct supplier names that match the search query cursor.execute( "SELECT supplier_name FROM purchase_order WHERE supplier_name LIKE %s", (f"%{query}%",) ) suppliers = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(suppliers) @app.route('/get_purchase_order_numbers') def get_purchase_order_numbers(): supplier = request.args.get('supplier_name', '') query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() if supplier: cursor.execute(""" SELECT purchase_order_no FROM purchase_order WHERE supplier_name = %s AND purchase_order_no LIKE %s """, (supplier, f"%{query}%")) else: cursor.execute(""" SELECT purchase_order_no FROM purchase_order WHERE purchase_order_no LIKE %s """, (f"%{query}%",)) orders = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(orders) @app.route('/get_purchase_order_data') def get_purchase_order_data(): supplier = request.args.get('supplier_name') order_no = request.args.get('purchase_order_no') query = "SELECT * FROM purchase_order WHERE 1=1" params = [] if supplier: query += " AND supplier_name = %s" params.append(supplier) if order_no: query += " AND purchase_order_no = %s" params.append(order_no) connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() connection.close() return jsonify(data) @app.route('/download_purchase_order_report') def download_purchase_order_report(): supplier_name = request.args.get('supplier_name') purchase_order_no = request.args.get('purchase_order_no') if purchase_order_no == "null": purchase_order_no = None LogHelper.log_action("Download purchase order", f"User {current_user.id} Download puirchase Order'{ purchase_order_no}'") query = "SELECT * FROM purchase_order WHERE 1=1" params = [] if supplier_name: query += " AND supplier_name = %s" params.append(supplier_name) if purchase_order_no: query += " AND purchase_order_no = %s" params.append(purchase_order_no) conn = config.get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() conn.close() if not data: return "No data found for the selected filters", 404 # Convert to DataFrame df = pd.DataFrame(data) output_path = 'static/downloads/purchase_order_report.xlsx' os.makedirs(os.path.dirname(output_path), exist_ok=True) df.to_excel(output_path, index=False, startrow=2) # Reserve space for heading # Load workbook for styling wb = load_workbook(output_path) ws = wb.active # Add a merged title title = "Purchase Order Report" ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns)) title_cell = ws.cell(row=1, column=1) title_cell.value = title title_cell.font = Font(size=14, bold=True, color="1F4E78") title_cell.alignment = Alignment(horizontal="center", vertical="center") # Style header row (row 3 because data starts from row 3) header_font = Font(bold=True) for col_num, column_name in enumerate(df.columns, 1): cell = ws.cell(row=3, column=col_num) cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") max_length = max(len(str(column_name)), 12) ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2 wb.save(output_path) return send_file(output_path, as_attachment=True) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)