Files
Version-1-2/main.py

5020 lines
204 KiB
Python

from decimal import Decimal
from AppCode import ContractorInfo, Auth, Utilities, Log
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper
from AppCode.Auth import LoginLDAP, User
from AppCode.Log import LogData, LogHelper
from AppCode.State import State
from AppCode.District import District
from AppCode.Block import Block
from AppCode.Village import Village
# need to optimize above import lines
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app, session, send_file
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import mysql.connector
from mysql.connector import Error
import config
import os
import re
import ast
from datetime import datetime
import openpyxl
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
import logging
import pandas as pd
#import AppRoutes.StateRoute
# this is server
app = Flask(__name__)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
@login_manager.user_loader
def load_user(user_id):
return User(user_id)
#need to check and understand above function
app.secret_key = '9f2a1b8c4d6e7f0123456789abcdef01'
#Shouldnt be hardcoded
# this is Index page OR Home page..
@app.route('/')
@login_required
def index():
return render_template('index.html')
# ---------------- LOGIN ROUTE ----------------
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
loginData = LoginLDAP(request)
# If bind successful → set session and log
if loginData.isValidLogin:
if loginData.isDefaultCredentials:
LogHelper.log_action('Login', f"User {loginData.username} logged in (static user)")
else:
LogHelper.log_action('Login', f"User {loginData.username} logged in (LDAP)")
session['username'] = loginData.username
login_user(User(loginData.username))
return redirect(url_for('index', login='success'))
else:
flash(loginData.errorMessage, 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
LogHelper.log_action('Logout', f"User {current_user.id} logged out") # log the event
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('login'))
@app.route('/activity_log', methods=['GET', 'POST'])
@login_required
def activity_log():
# Filters (GET or POST)
start_date = request.values.get("start_date")
end_date = request.values.get("end_date")
user_name = request.values.get("username")
logData = LogData()
filtered_logs = logData.GetFilteredActivitiesLog(start_date,end_date,user_name)
return render_template(
"activity_log.html",
logs=filtered_logs,
start_date=start_date,
end_date=end_date,
username=user_name
)
# ------------------------- State controller ------------------------------------------
@app.route('/add_state', methods=['GET', 'POST'])
@login_required
def add_state():
state = State()
if request.method == 'POST':
state.AddState(request=request)
return state.resultMessage
statedata = state.GetAllStates(request=request)
return render_template('add_state.html', statedata=statedata)
# AJAX route to check state existence
@app.route('/check_state', methods=['POST'])
@login_required
def check_state():
state = State()
return state.CheckState(request=request)
# Delete State
@app.route('/delete_state/<int:id>', methods=['GET'])
@login_required
def deleteState(id):
state = State()
msg = state.DeleteState(request=request, id=id)
if not state.isSuccess:
return state.resultMessage
else:
return redirect(url_for('add_state'))
# Edit State
@app.route('/edit_state/<int:id>', methods=['GET', 'POST'])
@login_required
def editState(id):
connection = config.get_db_connection()
cursor = connection.cursor()
state = State()
statedata = []
if request.method == 'POST':
state.EditState(request=request, id=id)
if state.isSuccess:
return redirect(url_for('add_state'))
else:
return state.resultMessage
else:
statedata = state.GetStateByID(request=request, id=id)
if not state.isSuccess:
return state.resultMessage
if statedata is None:
statedata = []
return render_template('edit_state.html', state=statedata)
# -------- end State controller -----------
# ------------------------- District controller ------------------------------------------
@app.route('/add_district', methods=['GET', 'POST'])
@login_required
def add_district():
district = District()
if request.method == 'POST':
district.AddDistrict(request=request)
return district.resultMessage
state = State()
states = state.GetAllStates(request=request)
districtdata = district.GetAllDistricts(request=request)
return render_template('add_district.html', districtdata=districtdata, states=states)
# AJAX route to check district existence
@app.route('/check_district', methods=['POST'])
@login_required
def check_district():
district = District()
return district.CheckDistrict(request=request)
# Delete District
@app.route('/delete_district/<int:district_id>', methods=['GET'])
@login_required
def delete_district(district_id):
district = District()
district.DeleteDistrict(request=request, id=district_id)
if not district.isSuccess:
return district.resultMessage
else:
return redirect(url_for('add_district'))
# Edit District
@app.route('/edit_district/<int:district_id>', methods=['GET', 'POST'])
@login_required
def edit_district(district_id):
district = District()
if request.method == 'POST':
district.EditDistrict(request=request, id=district_id)
if district.isSuccess:
return redirect(url_for('add_district'))
else:
flash(district.resultMessage, "error")
districtdata = district.GetDistrictByID(request=request, id=district_id)
state = State()
states = state.GetAllStates(request=request)
return render_template('edit_district.html', districtdata=districtdata, states=states)
# GET Request
else:
districtdata = district.GetDistrictByID(request=request, id=district_id)
if not district.isSuccess:
flash(district.resultMessage, "error")
return redirect(url_for('add_district'))
state = State()
states = state.GetAllStates(request=request)
if districtdata is None:
districtdata = []
if states is None:
states = []
return render_template('edit_district.html', districtdata=districtdata, states=states)
# --------- end District controller -------------
# ------------------------- Block controller ------------------------------------------
@app.route('/add_block', methods=['GET', 'POST'])
@login_required
def add_block():
block = Block()
district = District()
# form submission
if request.method == 'POST':
block.AddBlock(request)
return block.resultMessage
# Fetch all states
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
# Fetch all blocks
block_data = block.GetAllBlocks()
return render_template('add_block.html', states=states, block_data=block_data)
@app.route('/check_block', methods=['POST'])
@login_required
def check_block():
block = Block()
return block.CheckBlock(request)
@app.route('/edit_block/<int:block_id>', methods=['GET', 'POST'])
@login_required
def edit_block(block_id):
block = Block()
if request.method == 'POST':
return block.EditBlock(request, block_id)
# Load all states
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
# Load all districts
cursor.callproc("GetAllDistrictsData")
for rs in cursor.stored_results():
districts = rs.fetchall()
block_data = block.GetBlockByID(block_id)
return render_template('edit_block.html', block_data=block_data, states=states, districts=districts)
@app.route('/delete_block/<int:block_id>')
@login_required
def delete_block(block_id):
block = Block()
block.DeleteBlock(block_id)
return redirect(url_for('add_block'))
# get block by district id
@app.route('/get_blocks/<int:district_id>', methods=['GET'])
@login_required
def get_blocks(district_id):
connection = config.get_db_connection()
cursor = connection.cursor()
blocks = []
try:
# cursor.execute("SELECT Block_Id, Block_Name FROM blocks WHERE District_id = %s", (district_id,))
# blocks = cursor.fetchall()
cursor.callproc("GetBlocksByDistrict", (district_id,))
for rs in cursor.stored_results():
blocks = rs.fetchall()
# log_action("Get blocks", f"User {current_user.id} Get Blocks '{district_id}'")
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
return HtmlHelper.json_response({"error": "Failed to fetch blocks"}, 500)
finally:
cursor.close()
connection.close()
return jsonify({"blocks": [{"Block_Id": block[0], "Block_Name": block[1]} for block in blocks]})
# this is get district all data by using state id ..
@app.route('/get_districts/<int:state_id>', methods=['GET'])
@login_required
def get_districts(state_id):
connection = config.get_db_connection()
districts = []
if connection:
cursor = connection.cursor()
try:
# cursor.execute("SELECT District_id, District_Name FROM districts WHERE State_Id = %s", (state_id,))
# districts = cursor.fetchall()
cursor.callproc("GetDistrictsByStateId", (state_id,))
for dis in cursor.stored_results():
districts = dis.fetchall()
LogHelper.log_action("Get District", f"User {current_user.id} Get District '{state_id}'")
except mysql.connector.Error as e:
print(f"Error fetching districts: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("districts"), 500)
finally:
cursor.close()
connection.close()
return jsonify({
"districts": [{"District_id": d[0], "District_Name": d[1]} for d in districts]
})
# ----------- end Block controller -----------------
# ------------------------- Village controller ------------------------------------------
# Route to add a village
@app.route('/add_village', methods=['GET', 'POST'])
@login_required
def add_village():
village = Village()
if request.method == 'POST':
village.AddVillage(request=request)
return village.resultMessage
state = State() # Use the State class to get states
states = state.GetAllStates(request=request)
villages = village.GetAllVillages(request=request)
return render_template('add_village.html', states=states, villages=villages)
@app.route('/check_village', methods=['POST'])
@login_required
def check_village():
village = Village()
return village.CheckVillage(request=request)
# Delete Village
@app.route('/delete_village/<int:village_id>', methods=['GET'])
@login_required
def delete_village(village_id):
village = Village()
village.DeleteVillage(request=request, village_id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
return redirect(url_for('add_village'))
else:
return redirect(url_for('add_village'))
# Edit Village
@app.route('/edit_village/<int:village_id>', methods=['GET', 'POST'])
@login_required
def edit_village(village_id):
village = Village()
if request.method == 'POST':
village.EditVillage(request=request, village_id=village_id)
if village.isSuccess:
flash(village.resultMessage, "success")
return redirect(url_for('add_village'))
else:
flash(village.resultMessage, "error")
village_data = village.GetVillageByID(request=request, id=village_id)
blocks = village.GetAllBlocks(request=request)
return render_template('edit_village.html', village_data=village_data, blocks=blocks)
else:
village_data = village.GetVillageByID(request=request, id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
return redirect(url_for('add_village'))
blocks = village.GetAllBlocks(request=request)
if village_data is None:
village_data = [] # Ensure it's iterable in template
if blocks is None:
blocks = []
return render_template('edit_village.html', village_data=village_data, blocks=blocks)
# ---- end Village controller ---------------------
# -------------------------------- Invoice controller ------------------------------------------
@app.route('/add_invoice', methods=['GET', 'POST'])
@login_required
def add_invoice():
connection = config.get_db_connection()
if not connection:
return jsonify({"status": "error", "message": "Database connection failed"}), 500
if request.method == 'POST':
try:
cursor = connection.cursor(dictionary=True)
# Get the village name from the form
village_name = request.form.get('village')
print("village name", village_name)
# Query the database to get the corresponding Village_Id based on the village name
# cursor.execute("SELECT Village_Id FROM villages WHERE Village_Name = %s", (village_name,))
# village_result = cursor.fetchone()
cursor.callproc("GetVillageIdByName", (village_name,))
for rs in cursor.stored_results():
village_result = rs.fetchone()
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
# Fetch form data
pmc_no = request.form.get('pmc_no')
work_type = request.form.get('work_type')
invoice_details = request.form.get('invoice_details')
invoice_date = request.form.get('invoice_date')
invoice_no = request.form.get('invoice_no')
basic_amount = request.form.get('basic_amount')
basic_amount=float(basic_amount) if basic_amount else 0.0
debit_amount = request.form.get('debit_amount')
debit_amount=float(debit_amount) if debit_amount else 0.0
after_debit_amount = request.form.get('after_debit_amount')
after_debit_amount=float(after_debit_amount) if after_debit_amount else 0.0
amount = request.form.get('amount')
amount=float(amount) if amount else 0.0
gst_amount = request.form.get('gst_amount')
gst_amount=float(gst_amount) if gst_amount else 0.0
tds_amount = request.form.get('tds_amount')
tds_amount=float(tds_amount) if tds_amount else 0.0
sd_amount = request.form.get('sd_amount')
sd_amount=float(sd_amount) if sd_amount else 0.0
on_commission = request.form.get('on_commission')
on_commission=float(on_commission) if on_commission else 0.0
hydro_testing = request.form.get('hydro_testing')
hydro_testing=float(hydro_testing) if hydro_testing else 0.0
gst_sd_amount = request.form.get('gst_sd_amount')
gst_sd_amount=float(gst_sd_amount) if gst_sd_amount else 0.0
final_amount = request.form.get('final_amount')
final_amount=float(final_amount) if final_amount else 0.0
# insert_invoice_query = '''
# INSERT INTO invoice (
# PMC_No, Village_Id, Work_Type, Invoice_Details, Invoice_Date, Invoice_No,
# Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
# SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount
# ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
# '''
# invoice_values = (
# pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no,
# basic_amount, debit_amount, after_debit_amount, amount, gst_amount, tds_amount,
# sd_amount, on_commission, hydro_testing, gst_sd_amount, final_amount
# )
# cursor.execute(insert_invoice_query, invoice_values)
# connection.commit()
# invoice_id = cursor.lastrowid
cursor.callproc('InsertInvoice', [
pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no,
basic_amount, debit_amount, after_debit_amount, amount, gst_amount, tds_amount,
sd_amount, on_commission, hydro_testing, gst_sd_amount, final_amount])
LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{ pmc_no}'")
for result in cursor.stored_results():
invoice_id = result.fetchone()['invoice_id']
connection.commit()
print("This is the invocie id from the invoice table ", invoice_id)
# Insert into assign_subcontractors table
# subcontractor_id = request.form.get('subcontractor_id')
# insert_assign_query = '''
# INSERT INTO assign_subcontractors (PMC_no, Contractor_Id, Village_Id)
# VALUES (%s, %s, %s)
# '''
# cursor.execute(insert_assign_query, (pmc_no, subcontractor_id, village_id))
# connection.commit()
subcontractor_id = request.form.get('subcontractor_id')
cursor.callproc('AssignSubcontractor', [pmc_no, subcontractor_id, village_id])
connection.commit()
# Insert Hold Amounts into invoice_subcontractor_hold_join table
hold_types = request.form.getlist('hold_type[]')
hold_amounts = request.form.getlist('hold_amount[]')
hold_count = 0
for hold_type, hold_amount in zip(hold_types, hold_amounts):
# cursor.execute("SELECT hold_type_id FROM hold_types WHERE hold_type = %s", (hold_type,))
# hold_type_result = cursor.fetchone()
cursor.callproc('GetHoldTypeIdByName', [hold_type])
for result in cursor.stored_results():
hold_type_result = result.fetchone()
print("hold type from invoice ", hold_type_result)
if not hold_type_result:
return jsonify({"status": "error", "message": f"Invalid Hold Type: {hold_type}"}), 400
hold_type_id = hold_type_result['hold_type_id']
# insert_hold_query = '''
# INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount)
# VALUES (%s, %s, %s, %s)
# '''
# cursor.execute(insert_hold_query, (subcontractor_id, invoice_id, hold_type_id, hold_amount))
# hold_count += 1
# connection.commit()
cursor.callproc('InsertInvoiceSubcontractorHold', [
subcontractor_id, invoice_id, hold_type_id, hold_amount
])
connection.commit()
hold_count += 1
print("Hold count from the invoice", hold_count)
connection.commit()
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
except mysql.connector.Error as e:
connection.rollback()
return jsonify({"status": "error", "message": f"Failed to add invoice: {str(e)}"}), 500
finally:
cursor.close()
connection.close()
# GET request: fetch and display all invoices (all fields) along with the form
try:
cursor = connection.cursor(dictionary=True)
# cursor.execute("SELECT * FROM view_invoice_details")
# invoices = cursor.fetchall()
cursor.callproc('GetAllInvoiceDetails')
for result in cursor.stored_results():
invoices = result.fetchall()
villages = []
cursor.callproc("GetAllVillages")
for result in cursor.stored_results():
villages = result.fetchall()
except mysql.connector.Error as e:
print(f"Error: {e}")
invoices = []
finally:
cursor.close()
connection.close()
return render_template('add_invoice.html', invoices=invoices, villages=villages)
# search subcontraactor to assing invoice
@app.route('/search_subcontractor', methods=['POST'])
@login_required
def search_subcontractor():
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("database connection"), 500)
sub_query = request.form.get("query")
try:
cursor = connection.cursor(dictionary=True)
# cursor.execute(
# "SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name LIKE %s",
# (f"%{sub_query}%",)
# )
# results = cursor.fetchall()
cursor.callproc('SearchContractorsByName', [sub_query])
for result in cursor.stored_results():
results = result.fetchall()
print(results)
if not results:
return "<li>No subcontractor found</li>"
output = "".join(
f"<li data-id='{row['Contractor_Id']}'>{row['Contractor_Name']}</li>"
for row in results
)
print("Ajax Call for subcontractor", output)
return output
except mysql.connector.Error as e:
return HtmlHelper.json_response(ResponseHandler.fetch_failure(f"Search failed: {str(e)}"), 500)
finally:
cursor.close()
connection.close()
# get hold types
@app.route('/get_hold_types', methods=['GET'])
@login_required
def get_hold_types():
connection = config.get_db_connection()
try:
cursor = connection.cursor(dictionary=True)
# cursor.execute("SELECT hold_type_id, hold_type FROM hold_types")
# hold_types = cursor.fetchall()
cursor.callproc("GetAllHoldTypes")
for hold in cursor.stored_results():
hold_types = hold.fetchall()
LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type'{ hold_types}'")
return jsonify(hold_types)
except mysql.connector.Error as e:
return ResponseHandler.fetch_failure({str(e)}), 500
# return jsonify({"status": "error", "message": f"Failed to fetch hold types: {str(e)}"}), 500
finally:
cursor.close()
connection.close()
# update invoice by id
@app.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST'])
@login_required
def edit_invoice(invoice_id):
connection = config.get_db_connection()
if not connection:
return jsonify({"status": "error", "message": "Database connection failed"}), 500
cursor = connection.cursor(dictionary=True)
if request.method == 'POST':
try:
# Fetch updated form data
subcontractor_id = request.form.get('subcontractor_id', '').strip()
subcontractor_id = int(subcontractor_id) if subcontractor_id else None
village_name = request.form.get('village')
# cursor.execute("SELECT Village_Id FROM villages WHERE Village_Name = %s", (village_name,))
# village_result = cursor.fetchone()
cursor.callproc("GetVillageIdByName", (village_name,))
for rs in cursor.stored_results():
village_result = rs.fetchone()
if not village_result:
return jsonify({"status": "error", "message": "Invalid Village Name"}), 400
village_id = village_result['Village_Id']
pmc_no = request.form.get('pmc_no')
work_type = request.form.get('work_type')
invoice_details = request.form.get('invoice_details')
invoice_date = request.form.get('invoice_date')
invoice_no = request.form.get('invoice_no')
LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice'{ invoice_id}'")
# Convert numeric fields properly
numeric_fields = {
"basic_amount": request.form.get('basic_amount'),
"debit_amount": request.form.get('debit_amount'),
"after_debit_amount": request.form.get('after_debit_amount'),
"amount": request.form.get('amount'),
"gst_amount": request.form.get('gst_amount'),
"tds_amount": request.form.get('tds_amount'),
"sd_amount": request.form.get('sd_amount'),
"on_commission": request.form.get('on_commission'),
"hydro_testing": request.form.get('hydro_testing'),
"gst_sd_amount": request.form.get('gst_sd_amount'),
"final_amount": request.form.get('final_amount'),
}
numeric_fields = {k: float(v) if v else 0 for k, v in numeric_fields.items()}
# # Update invoice
# update_invoice_query = '''
# UPDATE invoice
# SET PMC_No=%s, Village_Id=%s, Work_Type=%s, Invoice_Details=%s, Invoice_Date=%s,
# Invoice_No=%s, Basic_Amount=%s, Debit_Amount=%s, After_Debit_Amount=%s,
# Amount=%s, GST_Amount=%s, TDS_Amount=%s, SD_Amount=%s, On_Commission=%s,
# Hydro_Testing=%s, GST_SD_Amount=%s, Final_Amount=%s
# WHERE Invoice_Id=%s
# '''
# invoice_values = (
# pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no,
# *numeric_fields.values(), invoice_id
# )
# cursor.execute(update_invoice_query, invoice_values)
# connection.commit()
cursor.callproc('UpdateInvoice', [
pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no,
*numeric_fields.values(), invoice_id
])
connection.commit()
# Handle holds
hold_types = request.form.getlist('hold_type[]')
hold_amounts = request.form.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue # skip empty hold types
# Get or insert hold type
# cursor.execute("SELECT hold_type_id FROM hold_types WHERE hold_type = %s", (hold_type,))
# hold_type_result = cursor.fetchone()
cursor.callproc('GetHoldTypeIdByName', [hold_type])
for result in cursor.stored_results():
hold_type_result = result.fetchone()
# if not hold_type_result:
# cursor.execute("INSERT INTO hold_types (hold_type) VALUES (%s)", (hold_type,))
# connection.commit()
# hold_type_id = cursor.lastrowid
# else:
# hold_type_id = hold_type_result['hold_type_id']
if not hold_type_result:
# Call stored procedure to insert and return new ID
cursor.callproc('InsertHoldType', [hold_type, 0])
for result in cursor.stored_results():
pass # advance past any results
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
print("if not hold type result anish:", hold_type_id)
else:
hold_type_id = hold_type_result['hold_type_id']
print("if hold type result anish:", hold_type_id)
hold_amount = float(hold_amount) if hold_amount else 0
# Check if join exists
# cursor.execute("""
# SELECT join_id FROM invoice_subcontractor_hold_join
# WHERE Invoice_Id = %s AND Contractor_Id = %s AND hold_type_id = %s
# """, (invoice_id, subcontractor_id, hold_type_id))
# join_result = cursor.fetchone()
cursor.callproc('GetHoldJoinId', [invoice_id, subcontractor_id, hold_type_id])
for result in cursor.stored_results():
join_result = result.fetchone()
if join_result:
# cursor.execute("""
# UPDATE invoice_subcontractor_hold_join
# SET hold_amount = %s
# WHERE join_id = %s
# """, (hold_amount, join_result['join_id']))
cursor.callproc('UpdateHoldAmountByJoinId', [hold_amount, join_result['join_id']])
connection.commit()
else:
# cursor.execute("""
# INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount)
# VALUES (%s, %s, %s, %s)
# """, (subcontractor_id, invoice_id, hold_type_id, hold_amount))
cursor.callproc('InsertInvoiceSubcontractorHold', [
subcontractor_id, invoice_id, hold_type_id, hold_amount
])
connection.commit()
connection.commit()
return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200
except mysql.connector.Error as e:
connection.rollback()
return jsonify({"status": "error", "message": f"Failed to update invoice: {str(e)}"}), 500
finally:
cursor.close()
connection.close()
# ------------------ GET Request ------------------
try:
# Fetch invoice data
# cursor.execute(
# """SELECT i.*, s.Contractor_Name, v.Village_Name
# FROM invoice i
# LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_no AND i.Village_Id = a.Village_Id
# LEFT JOIN subcontractors s ON a.Contractor_Id = s.Contractor_Id
# LEFT JOIN villages v ON i.Village_Id = v.Village_Id
# WHERE i.Invoice_Id = %s""", (invoice_id,)
# )
# invoice = cursor.fetchone()
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
for result in cursor.stored_results():
invoice = result.fetchone()
if not invoice:
return jsonify({"status": "error", "message": "Invoice not found"}), 404
# Important! Clear unread result issue
while cursor.nextset():
pass
# Fetch hold amounts
# cursor.execute(
# """SELECT h.hold_type, ihj.hold_amount
# FROM invoice_subcontractor_hold_join ihj
# JOIN hold_types h ON ihj.hold_type_id = h.hold_type_id
# WHERE ihj.Invoice_Id = %s""", (invoice_id,)
# )
# hold_amounts = cursor.fetchall()
# invoice["hold_amounts"] = hold_amounts
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
for result in cursor.stored_results():
hold_amounts = result.fetchall()
invoice["hold_amounts"] = hold_amounts
except mysql.connector.Error as e:
return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500
finally:
cursor.close()
connection.close()
return render_template('edit_invoice.html', invoice=invoice)
# delete invoice by id
@app.route('/delete_invoice/<int:invoice_id>', methods=['GET'])
@login_required
def delete_invoice(invoice_id):
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("invoice"), 500)
try:
cursor = connection.cursor()
# cursor.execute("DELETE FROM invoice WHERE Invoice_Id = %s", (invoice_id,))
cursor.callproc("DeleteInvoice", (invoice_id,))
LogHelper.log_action("Delete invoice", f"User {current_user.id} Delete invoice'{ invoice_id}'")
connection.commit()
# Check if the invoice was actually deleted
if cursor.rowcount == 0:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("invoice"), 404)
return redirect(url_for('add_invoice'))
except mysql.connector.Error as e:
print("Error deleting invoice:", e)
return HtmlHelper.json_response(ResponseHandler.delete_failure("invoice"), 500)
finally:
cursor.close()
connection.close()
# ---------- end Invoice controller ------------------
# ----------------------------- Payment controller ------------------------------------------
# this is Payment Page to add data
# @app.route('/add_payment', methods=['GET', 'POST'])
# def add_payment():
# connection = config.get_db_connection()
# payments = [] # List to hold payment history
#
# if not connection:
# return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500)
#
# try:
# cursor = connection.cursor()
#
# # Retrieve payment history
# # cursor.execute(
# # "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment"
# # )
# # payments = cursor.fetchall()
# cursor.callproc("GetAllPayments")
# for result in cursor.stored_results():
# payments = result.fetchall()
#
# except mysql.connector.Error as e:
# print(f"Error fetching payment history: {e}")
# return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500)
# finally:
# cursor.close()
#
# if request.method == 'POST':
# pmc_no = request.form['PMC_No']
# invoice_no = request.form['invoice_No']
# amount = request.form['Payment_Amount']
# tds_amount = request.form['TDS_Payment_Amount']
# total_amount = request.form['total_amount']
# utr = request.form['utr']
#
# try:
# cursor = connection.cursor()
# cursor.callproc('SavePayment', (
# pmc_no, invoice_no, amount, tds_amount, total_amount, utr
# ))
# connection.commit()
# return redirect(url_for('add_payment')) # Redirect to add_payment page to reload the form
# except mysql.connector.Error as e:
# print(f"Error inserting payment: {e}")
# return HtmlHelper.json_response(ResponseHandler.add_failure("payment"), 500)
# finally:
# cursor.close()
# connection.close()
#
# return render_template('add_payment.html', payments=payments)
@app.route('/add_payment', methods=['GET', 'POST'])
@login_required
def add_payment():
connection = config.get_db_connection()
payments = []
if connection:
cursor = connection.cursor()
try:
# cursor.execute(
# "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment"
# )
# payments = cursor.fetchall()
cursor.callproc('GetAllPayments')
for result in cursor.stored_results():
payments = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching payment history: {e}")
return "Failed to fetch payment history", 500
finally:
cursor.close()
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
amount = request.form['Payment_Amount']
tds_amount = request.form['TDS_Payment_Amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Add Payment", f"User {current_user.id} Add Payment'{ pmc_no}'")
try:
cursor = connection.cursor()
# cursor.execute('''INSERT INTO payment (PMC_No, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR)
# VALUES (%s, %s, %s, %s, %s, %s)''',
# (pmc_no, invoice_no, amount, tds_amount, total_amount, utr))
# connection.commit()
cursor.callproc('InsertPayments', [
pmc_no, invoice_no, amount, tds_amount, total_amount, utr
])
connection.commit()
return redirect(url_for('add_payment'))
except mysql.connector.Error as e:
print(f"Error inserting payment: {e}")
return "Failed to add payment", 500
finally:
cursor.close()
connection.close()
return render_template('add_payment.html', payments=payments)
@app.route('/get_pmc_nos_by_subcontractor/<subcontractorId>')
@login_required
def get_pmc_nos_by_subcontractor(subcontractorId):
connection = config.get_db_connection()
cur = connection.cursor()
print(subcontractorId)
# query = """
# SELECT DISTINCT i.PMC_No
# FROM invoice i
# JOIN assign_subcontractors a ON i.PMC_No = a.PMC_no
# JOIN subcontractors s ON a.Contractor_Id = s.Contractor_Id
# WHERE s.Contractor_Id=%s;
# """
# cur.execute(query, (subcontractorId,))
# results = cur.fetchall()
cur.callproc('GetDistinctPMCNoByContractorId', [subcontractorId])
for result in cur.stored_results():
results = result.fetchall()
print(results)
pmc_nos = [row[0] for row in results]
cur.close()
return jsonify({'pmc_nos': pmc_nos})
# Edit Payment Route
@app.route('/edit_payment/<int:payment_id>', methods=['GET', 'POST'])
@login_required
def edit_payment(payment_id):
connection = config.get_db_connection()
payment_data = {} # To hold the payment data for the given ID
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500)
try:
cursor = connection.cursor()
# Fetch the existing payment data for the given payment_id
# cursor.execute(
# "SELECT Payment_Id, PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR FROM payment WHERE Payment_Id = %s",
# (payment_id,)
# )
# payment_data = cursor.fetchone()
cursor.callproc("GetPaymentById", (payment_id,))
for result in cursor.stored_results():
payment_data = result.fetchone()
# Handle POST request to update the payment
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
amount = request.form['Payment_Amount']
tds_amount = request.form['TDS_Payment_Amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit Payment", f"User {current_user.id} Edit Payment'{ pmc_no}'")
try:
# cursor.execute('''UPDATE payment SET PMC_No=%s, Invoice_No=%s, Payment_Amount=%s, TDS_Payment_Amount=%s,
# Total_Amount=%s, UTR=%s WHERE Payment_Id=%s''',
# (pmc_no, invoice_no, amount, tds_amount, total_amount, utr, payment_id))
cursor.callproc("UpdatePayment",
(payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr,))
connection.commit()
return redirect(url_for('add_payment')) # Redirect to add_payment page to view the updated list
except mysql.connector.Error as e:
print(f"Error updating payment: {e}")
return HtmlHelper.json_response(ResponseHandler.update_failure("payment"), 500)
except mysql.connector.Error as e:
print(f"Error fetching payment data: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500)
finally:
cursor.close()
connection.close()
return render_template('edit_payment.html', payment_data=payment_data)
# Delete Payment Route
@app.route('/delete_payment/<int:payment_id>', methods=['GET', 'POST'])
@login_required
def delete_payment(payment_id):
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 500)
try:
cursor = connection.cursor()
# cursor.execute("DELETE FROM payment WHERE Payment_Id = %s", (payment_id,))
cursor.callproc("DeletePayment", (payment_id,))
LogHelper.log_action("Delete Payment", f"User {current_user.id} Delete Payment'{ payment_id}'")
connection.commit()
# Check if any rows were deleted
if cursor.rowcount == 0:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("payment"), 404)
return redirect(url_for('add_payment')) # Redirect back to the add_payment page
except mysql.connector.Error as e:
print(f"Error deleting payment: {e}")
return HtmlHelper.json_response(ResponseHandler.delete_failure("payment"), 500)
finally:
cursor.close()
connection.close()
# --- end Payment controller -----------
# ------------------------- GST Release controller ------------------------------------------
@app.route('/add_gst_release', methods=['GET', 'POST'])
@login_required
def add_gst_release():
connection = config.get_db_connection()
gst_releases = [] # List to hold GST Release history
invoices = [] # List to hold invoices for the dropdown
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500)
try:
cursor = connection.cursor()
# Retrieve GST Release history
cursor.execute("SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release")
gst_releases = cursor.fetchall()
# cursor.callproc("GetAllGSTReleases")
# for result in cursor.stored_results():
# gst_releases = result.fetchall()
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
contractor_id = request.form['subcontractor_id']
LogHelper.log_action("Add gst_release", f"User {current_user.id} Add gst_release'{ pmc_no}'")
# cursor.callproc('SaveGSTRelease', (
# pmc_no, invoice_no, basic_amount, final_amount,total_amount, utr
# ))
# connection.commit()
cursor.execute("""
INSERT INTO gst_release (PMC_No,
invoice_no,
Basic_Amount,
Final_Amount,
Total_Amount,
UTR,
Contractor_Id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
pmc_no,
invoice_no,
basic_amount,
final_amount,
total_amount,
utr,
contractor_id
))
connection.commit()
return redirect(url_for('add_gst_release')) # Redirect to add_gst_release page
except mysql.connector.Error as e:
print(f"Error: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("GST Release"), 500)
finally:
cursor.close()
connection.close()
return render_template('add_gst_release.html', invoices=invoices, gst_releases=gst_releases)
# update gst Release by id
@app.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def edit_gst_release(gst_release_id):
connection = config.get_db_connection()
gst_release_data = {} # To hold the GST release data for the given ID
invoices = [] # List to hold invoices for the dropdown
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500)
try:
cursor = connection.cursor()
# Fetch the existing GST release data for the given gst_release_id
cursor.execute(
"SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release WHERE GST_Release_Id = %s",
(gst_release_id,)
)
gst_release_data = cursor.fetchone()
# cursor.callproc("GetGSTReleaseById", (gst_release_id,))
# for result in cursor.stored_results():
# gst_release_data = result.fetchone()
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit gst_release", f"User {current_user.id} Edit gst_release'{ pmc_no}'")
try:
cursor.execute("""
UPDATE gst_release
SET PMC_No = %s,
invoice_no = %s,
Basic_Amount = %s,
Final_Amount = %s,
Total_Amount = %s,
UTR = %s
WHERE GST_Release_Id = %s
""", (
pmc_no,
invoice_no,
basic_amount,
final_amount,
total_amount,
utr,
gst_release_id
))
# cursor.callproc("UpdateGSTRelease", (gst_release_id, pmc_id, invoice_no, basic_amount, final_amount))
#
# connection.commit()
return redirect(url_for('add_gst_release')) # Redirect to the page to view the updated list
except mysql.connector.Error as e:
print(f"Error updating GST Release: {e}")
return HtmlHelper.json_response(ResponseHandler.update_failure("GST Release"), 500)
except mysql.connector.Error as e:
print(f"Error fetching GST Release data: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500)
finally:
cursor.close()
connection.close()
return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=invoices)
# delete gst release by id
@app.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def delete_gst_release(gst_release_id):
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 500)
try:
cursor = connection.cursor()
# cursor.execute("DELETE FROM gst_release WHERE GST_Release_Id = %s", (gst_release_id,))
cursor.callproc("DeleteGSTRelease", (gst_release_id,))
LogHelper.log_action("delete gst_release", f"User {current_user.id} delete gst_release'{ gst_release_id}'")
connection.commit()
# Check if any rows were deleted
if cursor.rowcount == 0:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("GST Release"), 404)
return redirect(url_for('add_gst_release')) # Redirect to the add_gst_release page
except mysql.connector.Error as e:
print(f"Error deleting GST Release: {e}")
return HtmlHelper.json_response(ResponseHandler.delete_failure("GST Release"), 500)
finally:
cursor.close()
connection.close()
# --- end GST Release controller -----
# ------------------------- Subcontractor controller ------------------------------------------
@app.route('/subcontractor', methods=['GET', 'POST'])
@login_required
def subcontract():
connection = config.get_db_connection()
subcontractor = []
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
try:
cursor = connection.cursor()
if request.method == 'GET':
try:
# cursor.execute('SELECT * FROM subcontractors;')
# subcontractor = cursor.fetchall() # Fetch the current subcontractor list
# connection.commit()
cursor.callproc('GetAllSubcontractors')
for result in cursor.stored_results():
subcontractor = result.fetchall()
except Error as e:
print(f"Error fetching data: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
if request.method == 'POST':
contractor_data = {
'Contractor_Name': request.form['Contractor_Name'],
'Address': request.form['Address'],
'Mobile_No': request.form['Mobile_No'],
'PAN_No': request.form['PAN_No'],
'Email': request.form['Email'],
'Gender': request.form['Gender'],
'GST_Registration_Type': request.form['GST_Registration_Type'],
'GST_No': request.form['GST_No'],
'Contractor_password': request.form['Contractor_password'],
}
try:
cursor.callproc('SaveContractor', (
contractor_data['Contractor_Name'],
contractor_data['Address'],
contractor_data['Mobile_No'],
contractor_data['PAN_No'],
contractor_data['Email'],
contractor_data['Gender'],
contractor_data['GST_Registration_Type'],
contractor_data['GST_No'],
contractor_data['Contractor_password']
))
connection.commit()
# Re-fetch subcontractors after inserting the new one
# cursor.execute('SELECT * FROM subcontractors')
# subcontractor = cursor.fetchall()
cursor.callproc('GetAllSubcontractors')
for result in cursor.stored_results():
subcontractor = result.fetchall()
except Error as e:
print(f"Error inserting data: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("Subcontractor"), 500)
except Error as e:
print(f"Error handling subcontractor data: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
finally:
cursor.close()
connection.close()
return render_template('add_subcontractor.html', subcontractor=subcontractor)
# update subcontractor by id
@app.route('/edit_subcontractor/<int:id>', methods=['GET', 'POST'])
@login_required
def edit_subcontractor(id):
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
try:
cursor = connection.cursor()
subcontractor = None
# Fetch existing subcontractor data by ID
# cursor.execute('SELECT * FROM subcontractors WHERE Contractor_Id = %s', (id,))
# subcontractor = cursor.fetchone()
cursor.callproc("GetSubcontractorById", (id,))
for contractors in cursor.stored_results():
subcontractor = contractors.fetchone()
if not subcontractor:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 404)
if request.method == 'POST':
updated_data = {
'Contractor_Name': request.form['Contractor_Name'],
'Address': request.form['Address'],
'Mobile_No': request.form['Mobile_No'],
'PAN_No': request.form['PAN_No'],
'Email': request.form['Email'],
'Gender': request.form['Gender'],
'GST_Registration_Type': request.form['GST_Registration_Type'],
'GST_No': request.form['GST_No'],
'Contractor_password': request.form['Contractor_password'],
'id': id
}
LogHelper.log_action("Edit Subcontractor", f"User {current_user.id}Edit Subcontractor'{ id}'")
try:
# cursor.execute("""UPDATE subcontractors SET
# Contractor_Name=%(Contractor_Name)s,
# Address=%(Address)s,
# Mobile_No=%(Mobile_No)s,
# PAN_No=%(PAN_No)s,
# Email=%(Email)s,
# Gender=%(Gender)s,
# GST_Registration_Type=%(GST_Registration_Type)s,
# GST_No=%(GST_No)s,
# Contractor_password=%(Contractor_password)s
# WHERE Contractor_Id=%(id)s""", updated_data)
cursor.callproc("UpdateSubcontractor", (
id,
updated_data['Contractor_Name'],
updated_data['Address'],
updated_data['Mobile_No'],
updated_data['PAN_No'],
updated_data['Email'],
updated_data['Gender'],
updated_data['GST_Registration_Type'],
updated_data['GST_No'],
updated_data['Contractor_password']
))
connection.commit()
return redirect(url_for('subcontract'))
except Error as e:
print(f"Error updating subcontractor: {e}")
return HtmlHelper.json_response(ResponseHandler.update_failure("Subcontractor"), 500)
except Error as e:
print(f"Error fetching subcontractor data: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
finally:
cursor.close()
connection.close()
return render_template('edit_subcontractor.html', subcontractor=subcontractor)
# delete Sub-Contractor methods by id ..
# @app.route('/deleteSubContractor/<int:id>', methods=['GET', 'POST'])
# def deleteSubContractor(id):
# connection = config.get_db_connection()
# if not connection:
# return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
# try:
# cursor = connection.cursor()
# # cursor.execute("DELETE FROM subcontractors WHERE Contractor_Id = %s", (id,))
# cursor.callproc("DeleteSubcontractor", (id,))
# connection.commit()
# # Check if any row was deleted (subcontractor found)
# if cursor.rowcount == 0:
# return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 404)
# except Error as e:
# print(f"Error deleting subcontractor: {e}")
# return HtmlHelper.json_response(ResponseHandler.delete_failure("Subcontractor"), 500)
# finally:
# cursor.close()
# connection.close()
# return redirect(url_for('subcontract'))
@app.route('/deleteSubContractor/<int:id>', methods=['GET', 'POST'])
@login_required
def deleteSubContractor(id):
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor"), 500)
try:
cursor = connection.cursor()
# Optional: check if subcontractor exists before attempting delete
cursor.execute("SELECT 1 FROM subcontractors WHERE Contractor_Id = %s", (id,))
if cursor.fetchone() is None:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor not found"), 404)
# Call stored procedure to delete subcontractor and related records
cursor.callproc("DeleteSubcontractor", (id,))
connection.commit()
# Retrieve result from procedure (SELECT ROW_COUNT())
affected_rows = 0
for result in cursor.stored_results():
row = result.fetchone()
affected_rows = row[0] if row else 0
LogHelper.log_action("Delete Subcontractor", f"User {current_user.id}Delete Subcontractor'{ id}'")
if affected_rows == 0:
return HtmlHelper.json_response(ResponseHandler.fetch_failure("Subcontractor not deleted"), 404)
except Error as e:
print(f"Error deleting subcontractor: {e}")
return HtmlHelper.json_response(ResponseHandler.delete_failure("Subcontractor"), 500)
finally:
cursor.close()
connection.close()
return redirect(url_for('subcontract')) # redirect to subcontractor list page
# ------------------------------- Show Report Subcontractor ---------------------
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Upload Excel file html page
@app.route('/upload_excel_file', methods=['GET', 'POST'])
def upload():
if request.method == 'POST':
file = request.files['file']
if file and file.filename.endswith('.xlsx'):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(filepath)
LogHelper.log_action("Upload Excel File", f"User {current_user.id}Upload Excel File'{file}'")
return redirect(url_for('show_table', filename=file.filename))
return render_template('uploadExcelFile.html')
# Show excel data in tables6
# @app.route('/show_table/<filename>')
# def show_table(filename):
# global data
# data = []
#
# filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# wb = openpyxl.load_workbook(filepath, data_only=True)
# sheet = wb.active
#
# # Extract key file information from the first 4 rows
# file_info = {
# "Subcontractor": sheet.cell(row=1, column=2).value,
# "State": sheet.cell(row=2, column=2).value,
# "District": sheet.cell(row=3, column=2).value,
# "Block": sheet.cell(row=4, column=2).value,
# }
#
# errors = []
# subcontractor_data = None
# state_data = None
# district_data = None
# block_data = None
#
# # Database connection
# connection = config.get_db_connection()
# if connection:
# try:
# cursor = connection.cursor(dictionary=True)
#
# # Validate State
# # cursor.execute("SELECT State_ID, State_Name FROM states WHERE State_Name = %s", (file_info['State'],))
# # state_data = cursor.fetchone()
# cursor.callproc('GetStateByName', [file_info['State']])
# for result in cursor.stored_results():
# state_data = result.fetchone()
#
# if not state_data:
# errors.append(f"State '{file_info['State']}' is not valid. Please add it.")
#
# # Validate District
# if state_data:
# # cursor.execute(
# # "SELECT District_ID, District_Name FROM districts WHERE District_Name = %s AND State_ID = %s",
# # (file_info['District'], state_data['State_ID'])
# # )
# # district_data = cursor.fetchone()
# cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']])
# for result in cursor.stored_results():
# district_data = result.fetchone()
#
# if not district_data:
# errors.append(
# f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.")
#
# # Validate Block
# if district_data:
# # cursor.execute(
# # "SELECT Block_Id, Block_Name FROM blocks WHERE Block_Name = %s AND District_ID = %s",
# # (file_info['Block'], district_data['District_ID'])
# # )
# # block_data = cursor.fetchone()
# cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']])
# for result in cursor.stored_results():
# block_data = result.fetchone()
#
# if not block_data:
# errors.append(
# f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.")
#
# # old code
# # # Validate Subcontractor
# # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM SubContractors WHERE Contractor_Name = %s",
# # (file_info['Subcontractor'],))
# # subcontractor_data = cursor.fetchone()
# cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
# for result in cursor.stored_results():
# subcontractor_data = result.fetchone()
#
# if not subcontractor_data:
# # cursor.execute("INSERT INTO subcontractors (Contractor_Name) VALUES (%s)",
# # (file_info['Subcontractor'],))
# # connection.commit()
# cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']])
# connection.commit()
#
# # cursor.execute("SELECT Contractor_Id, Contractor_Name FROM SubContractors WHERE Contractor_Name = %s",
# # (file_info['Subcontractor'],))
# # subcontractor_data = cursor.fetchone()
# cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
# for result in cursor.stored_results():
# subcontractor_data = result.fetchone()
#
# # new code
# # cursor.callproc('ValidateAndInsertSubcontractor', (file_info['Subcontractor'], 0, ''))
# #
# # for con in cursor.stored_results():
# # subcontractor_data = con.fetchone()
# # print("subcon:",subcontractor_data)
# #
# # print("subcontractor_data",subcontractor_data)
#
# # Get hold types data from database (for faster lookup)
# # cursor.execute("SELECT hold_type_id, hold_type FROM hold_types")
# # hold_types_data = cursor.fetchall()
#
# cursor.callproc("GetAllHoldTypes")
# for ht in cursor.stored_results():
# hold_types_data = ht.fetchall()
#
# hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if
# row['hold_type']}
#
# cursor.close()
# except mysql.connector.Error as e:
# print(f"Database error: {e}")
# return "Database operation failed", 500
# finally:
# connection.close()
#
# # Extract dynamic variable names from row 5 and detect "hold" columns
# variables = {}
# hold_columns = []
# hold_counter = 0
#
# for j in range(1, sheet.max_column + 1):
# col_value = sheet.cell(row=5, column=j).value
# if col_value:
# variables[col_value] = j # Store column name with its position
#
# # Check if the column header contains the word 'hold'
# if 'hold' in str(col_value).lower():
# hold_counter += 1
# # Lookup hold type id from database
# hold_type_key = str(col_value).lower().strip()
# hold_type_id = hold_types_lookup.get(hold_type_key, None)
# hold_columns.append({
# 'column_name': col_value,
# 'column_number': j,
# 'hold_type_id': hold_type_id
# })
#
# # Extract data dynamically based on row numbers
# for i in range(6, sheet.max_row + 1):
# row_data = {}
# if sheet.cell(row=i, column=1).value:
# row_data["Row Number"] = i # Store row number
# for var_name, col_num in variables.items():
# row_data[var_name] = sheet.cell(row=i, column=col_num).value
# # Check if at least 4 non-empty cells exist in the row
# if sum(1 for value in row_data.values() if value) >= 4:
# data.append(row_data)
#
# # For debugging or console output, you can print the hold columns info
# for hold in hold_columns:
# if hold['hold_type_id']:
# print(
# f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
# else:
# errors.append(
# f"Hold Type not added ! Column name '{hold['column_name']}'.")
# print(
# f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
#
# return render_template(
# 'show_excel_file.html',
# file_info=file_info,
# variables=variables,
# data=data,
# subcontractor_data=subcontractor_data,
# state_data=state_data,
# district_data=district_data,
# block_data=block_data,
# errors=errors,
# hold_columns=hold_columns,
# hold_counter=hold_counter
# )
@app.route('/show_table/<filename>')
def show_table(filename):
global data
data = []
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
wb = openpyxl.load_workbook(filepath, data_only=True)
sheet = wb.active
file_info = {
"Subcontractor": sheet.cell(row=1, column=2).value,
"State": sheet.cell(row=2, column=2).value,
"District": sheet.cell(row=3, column=2).value,
"Block": sheet.cell(row=4, column=2).value,
}
errors = []
subcontractor_data = None
state_data = None
district_data = None
block_data = None
connection = config.get_db_connection()
if connection:
try:
cursor = connection.cursor(dictionary=True)
print(f"Calling GetStateByName with: {file_info['State']}")
cursor.callproc('GetStateByName', [file_info['State']])
for result in cursor.stored_results():
state_data = result.fetchone()
if not state_data:
errors.append(f"State '{file_info['State']}' is not valid. Please add it.")
if state_data:
print(f"Calling GetDistrictByNameAndStates with: {file_info['District']}, {state_data['State_ID']}")
cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']])
for result in cursor.stored_results():
district_data = result.fetchone()
if not district_data:
errors.append(f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.")
if district_data:
print(f"Calling GetBlockByNameAndDistricts with: {file_info['Block']}, {district_data['District_ID']}")
cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']])
for result in cursor.stored_results():
block_data = result.fetchone()
if not block_data:
errors.append(f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.")
print(f"Calling GetSubcontractorByName with: {file_info['Subcontractor']}")
cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
for result in cursor.stored_results():
subcontractor_data = result.fetchone()
if not subcontractor_data:
print(f"Inserting subcontractor: {file_info['Subcontractor']}")
cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']])
connection.commit()
print(f"Calling GetSubcontractorByName again with: {file_info['Subcontractor']}")
cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
for result in cursor.stored_results():
subcontractor_data = result.fetchone()
print("Calling GetAllHoldTypes")
cursor.callproc("GetAllHoldTypes")
hold_types_data = []
for ht in cursor.stored_results():
hold_types_data = ht.fetchall()
hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']}
cursor.close()
except mysql.connector.Error as e:
print(f"Database error: {e}")
return f"Database operation failed: {e}", 500
finally:
connection.close()
variables = {}
hold_columns = []
hold_counter = 0
for j in range(1, sheet.max_column + 1):
col_value = sheet.cell(row=5, column=j).value
if col_value:
variables[col_value] = j
if 'hold' in str(col_value).lower():
hold_counter += 1
hold_type_key = str(col_value).lower().strip()
hold_type_id = hold_types_lookup.get(hold_type_key, None)
hold_columns.append({
'column_name': col_value,
'column_number': j,
'hold_type_id': hold_type_id
})
for i in range(6, sheet.max_row + 1):
row_data = {}
if sheet.cell(row=i, column=1).value:
row_data["Row Number"] = i
for var_name, col_num in variables.items():
row_data[var_name] = sheet.cell(row=i, column=col_num).value
if sum(1 for value in row_data.values() if value) >= 4:
data.append(row_data)
for hold in hold_columns:
if hold['hold_type_id']:
print(f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
else:
errors.append(f"Hold Type not added ! Column name '{hold['column_name']}'.")
print(f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
return render_template(
'show_excel_file.html',
file_info=file_info,
variables=variables,
data=data,
subcontractor_data=subcontractor_data,
state_data=state_data,
district_data=district_data,
block_data=block_data,
errors=errors,
hold_columns=hold_columns,
hold_counter=hold_counter
)
# Show excel data in tables6
# @app.route('/show_table/<filename>')
# def show_table(filename):
# global data
# data = []
#
# filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# wb = openpyxl.load_workbook(filepath, data_only=True)
# sheet = wb.active
#
# # Extract key file information from the first 4 rows
# file_info = {
# "Subcontractor": sheet.cell(row=1, column=2).value,
# "State": sheet.cell(row=2, column=2).value,
# "District": sheet.cell(row=3, column=2).value,
# "Block": sheet.cell(row=4, column=2).value,
# }
#
# errors = []
# subcontractor_data = None
# state_data = None
# district_data = None
# block_data = None
#
# # Database connection
# connection = config.get_db_connection()
# if connection:
# try:
# cursor = connection.cursor(dictionary=True)
#
# # Validate State
# cursor.execute("SELECT State_ID, State_Name FROM states WHERE State_Name = %s", (file_info['State'],))
# state_data = cursor.fetchone()
# if not state_data:
# errors.append(f"State '{file_info['State']}' is not valid. Please add it.")
#
# # Validate District
# if state_data:
# cursor.execute(
# "SELECT District_ID, District_Name FROM districts WHERE District_Name = %s AND State_ID = %s",
# (file_info['District'], state_data['State_ID'])
# )
# district_data = cursor.fetchone()
# if not district_data:
# errors.append(
# f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.")
#
# # Validate Block
# if district_data:
# cursor.execute(
# "SELECT Block_Id, Block_Name FROM blocks WHERE Block_Name = %s AND District_ID = %s",
# (file_info['Block'], district_data['District_ID'])
# )
# block_data = cursor.fetchone()
# if not block_data:
# errors.append(
# f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.")
#
#
# # old code
# # # Validate Subcontractor
# cursor.execute("SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name = %s",
# (file_info['Subcontractor'],))
# subcontractor_data = cursor.fetchone()
#
# if not subcontractor_data:
# cursor.execute("INSERT INTO subcontractors (Contractor_Name) VALUES (%s)",
# (file_info['Subcontractor'],))
# connection.commit()
# cursor.execute("SELECT Contractor_Id, Contractor_Name FROM subcontractors WHERE Contractor_Name = %s",
# (file_info['Subcontractor'],))
# subcontractor_data = cursor.fetchone()
#
# # new code
# # cursor.callproc('ValidateAndInsertSubcontractor', (file_info['Subcontractor'], 0, ''))
# #
# # for con in cursor.stored_results():
# # subcontractor_data = con.fetchone()
# # print("subcon:",subcontractor_data)
# #
# # print("subcontractor_data",subcontractor_data)
#
# # Get hold types data from database (for faster lookup)
# # cursor.execute("SELECT hold_type_id, hold_type FROM hold_types")
# # hold_types_data = cursor.fetchall()
#
# cursor.callproc("GetAllHoldTypes")
# for ht in cursor.stored_results():
# hold_types_data = ht.fetchall()
#
#
# hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']}
#
#
# cursor.close()
# except mysql.connector.Error as e:
# print(f"Database error: {e}")
#
# # return "Database operation failed", 500
# return f"{e}",500
# finally:
# connection.close()
#
# # Extract dynamic variable names from row 5 and detect "hold" columns
# variables = {}
# hold_columns = []
# hold_counter = 0
#
# for j in range(1, sheet.max_column + 1):
# col_value = sheet.cell(row=5, column=j).value
# if col_value:
# variables[col_value] = j # Store column name with its position
#
# # Check if the column header contains the word 'hold'
# if 'hold' in str(col_value).lower():
# hold_counter += 1
# # Lookup hold type id from database
# hold_type_key = str(col_value).lower().strip()
# hold_type_id = hold_types_lookup.get(hold_type_key, None)
# hold_columns.append({
# 'column_name': col_value,
# 'column_number': j,
# 'hold_type_id': hold_type_id
# })
#
# # Extract data dynamically based on row numbers
# for i in range(6, sheet.max_row + 1):
# row_data = {}
# if sheet.cell(row=i, column=1).value:
# row_data["Row Number"] = i # Store row number
# for var_name, col_num in variables.items():
# row_data[var_name] = sheet.cell(row=i, column=col_num).value
# # Check if at least 4 non-empty cells exist in the row
# if sum(1 for value in row_data.values() if value) >= 4:
# data.append(row_data)
#
# # For debugging or console output, you can print the hold columns info
# for hold in hold_columns:
# if hold['hold_type_id']:
# print(
# f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
# else:
# errors.append(
# f"Hold Type not added ! Column name '{hold['column_name']}'.")
# print(
# f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
#
# return render_template(
# 'show_excel_file.html',
# file_info=file_info,
# variables=variables,
# data=data,
# subcontractor_data=subcontractor_data,
# state_data=state_data,
# district_data=district_data,
# block_data=block_data,
# errors=errors,
# hold_columns=hold_columns,
# hold_counter=hold_counter
# )
# save Excel data
@app.route('/save_data', methods=['POST'])
def save_data():
# Extract form data
subcontractor_id = request.form.get("subcontractor_data")
state_id = request.form.get("state_data")
district_id = request.form.get("district_data")
block_id = request.form.get("block_data")
variables = request.form.getlist('variables[]')
hold_columns = request.form.get("hold_columns")
hold_counter = request.form.get("hold_counter")
# print("Info: ", subcontractor_id, state_id, district_id, block_id)
if not data:
return jsonify({"error": "No data provided to save"}), 400
if data:
# print("Total number of entries in data:", len(data))
connection = config.get_db_connection()
cursor = connection.cursor()
try:
for entry in data:
save_data = {
"PMC_No": entry.get("PMC_No"),
"Invoice_Details": entry.get("Invoice_Details", ''),
"Work_Type": 'none',
"Invoice_Date": entry.get("Invoice_Date").strftime('%Y-%m-%d') if entry.get(
"Invoice_Date") else None,
"Invoice_No": entry.get("Invoice_No", ''),
"Basic_Amount": entry.get("Basic_Amount", 0.00),
"Debit_Amount": entry.get("Debit_Amount", 0.00),
"After_Debit_Amount": entry.get("After_Debit_Amount", 0.00),
"Amount": entry.get("Amount", 0.00),
"GST_Amount": entry.get("GST_Amount", 0.00),
"TDS_Amount": entry.get("TDS_Amount", 0.00),
"SD_Amount": entry.get("SD_Amount", 0.00),
"On_Commission": entry.get("On_Commission", 0.00),
"Hydro_Testing": entry.get("Hydro_Testing", 0.00),
"Hold_Amount": 0,
"GST_SD_Amount": entry.get("GST_SD_Amount", 0.00),
"Final_Amount": entry.get("Final_Amount", 0.00),
"Payment_Amount": entry.get("Payment_Amount", 0.00),
"Total_Amount": entry.get("Total_Amount", 0.00),
"TDS_Payment_Amount": entry.get("TDS_Payment_Amount", 0.00),
"UTR": entry.get("UTR", ''),
}
village_name, work_type = None, None
village_id = 0
LogHelper.log_action("Data saved", f"User {current_user.id} Data saved'{ village_name}'")
PMC_No = save_data.get('PMC_No')
Invoice_Details = save_data.get('Invoice_Details')
Invoice_Date = save_data.get('Invoice_Date')
Invoice_No = save_data.get('Invoice_No')
Basic_Amount = save_data.get('Basic_Amount')
Debit_Amount = save_data.get('Debit_Amount')
After_Debit_Amount = save_data.get('After_Debit_Amount')
Amount = save_data.get('Amount')
GST_Amount = save_data.get('GST_Amount')
TDS_Amount = save_data.get('TDS_Amount')
SD_Amount = save_data.get('SD_Amount')
On_Commission = save_data.get('On_Commission')
Hydro_Testing = save_data.get('Hydro_Testing')
GST_SD_Amount = save_data.get('GST_SD_Amount')
Final_Amount = save_data.get('Final_Amount')
Payment_Amount = save_data.get('Payment_Amount')
Total_Amount = save_data.get('Total_Amount')
TDS_Payment_Amount = save_data.get('TDS_Payment_Amount')
UTR = save_data.get('UTR')
if Invoice_Details:
words = Invoice_Details.lower().split()
if 'village' in words:
village_pos = words.index('village')
village_name = " ".join(words[:village_pos])
if 'work' in words:
work_pos = words.index('work')
if village_name:
work_type = " ".join(words[village_pos + 1:work_pos + 1])
else:
work_type = " ".join(words[:work_pos + 1])
if Invoice_Details and 'village' in Invoice_Details.lower() and 'work' in Invoice_Details.lower():
print("village_name ::", village_name, "|| work_type ::", work_type)
if block_id and village_name:
village_id = None
# cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name))
# result = cursor.fetchone()
cursor.callproc("GetVillageId", (block_id, village_name))
for result in cursor.stored_results():
result = result.fetchone()
village_id = result[0] if result else None
if not village_id:
# cursor.execute("INSERT INTO villages (Village_Name, Block_Id) VALUES (%s, %s)", (village_name, block_id))
cursor.callproc("SaveVillage", (village_name, block_id))
# cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name))
# result = cursor.fetchone()
cursor.callproc("GetVillageId", (block_id, village_name))
for result in cursor.stored_results():
result = result.fetchone()
village_id = result[0] if result else None
print("village_id :", village_id)
print("block_id :", block_id)
print("invoice :", PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No,
Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount)
#
# cursor.execute("SET @p_invoice_id = 0")
# cursor.callproc("SaveInvoice", (
# PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No,
# Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
# SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount,
# subcontractor_id, "@p_invoice_id"
# ))
# cursor.execute("SELECT @p_invoice_id")
# invoice_id = cursor.fetchone()[0]
args = (
PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No,
Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount,
subcontractor_id, 0
)
# for result in cursor.stored_results():
# invoice_id = result.fetchone()['invoice_id']
print("All invoice Details ",args)
results = cursor.callproc('SaveInvoice', args)
# cursor.callproc("SaveInvoice",args)
# for re in cursor.stored_results():
invoice_id = results[-1]
print("invoice id from the excel ", invoice_id)
if isinstance(hold_columns, str):
hold_columns = ast.literal_eval(hold_columns)
# Check if hold_columns is actually a list of dictionaries
if isinstance(hold_columns, list) and all(isinstance(hold, dict) for hold in hold_columns):
for hold in hold_columns:
print(f"Processing hold: {hold}")
hold_column_name = hold.get('column_name') # Get column name
hold_type_id = hold.get('hold_type_id') # Get hold_type_id
if hold_column_name:
hold_amount = entry.get(
hold_column_name) # Get the value for that specific hold column
if hold_amount is not None:
print(f"Processing hold type: {hold_column_name}, Hold Amount: {hold_amount}")
# Insert into the invoice_subcontractor_hold_join table
hold_join_data = {
"Contractor_Id": subcontractor_id,
"Invoice_Id": invoice_id,
"hold_type_id": hold_type_id,
"hold_amount": hold_amount
}
# insert_hold_query = """INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount)
# VALUES (%(Contractor_Id)s, %(Invoice_Id)s, %(hold_type_id)s, %(hold_amount)s);
# """
# cursor.execute(insert_hold_query, hold_join_data)
# print(f"Inserted hold join data: {hold_join_data}")
cursor.callproc('InsertHoldJoinData', [
hold_join_data['Contractor_Id'], hold_join_data['Invoice_Id'],
hold_join_data['hold_type_id'], hold_join_data['hold_amount']
])
connection.commit()
print(f"Inserted hold join data: {hold_join_data}")
else:
print(f"Invalid hold entry: {hold}")
else:
print("Hold columns data is not a valid list of dictionaries.")
#---------------------------------------------Credit Note---------------------------------------------------------------------------
elif any(keyword in Invoice_Details.lower() for keyword in ['credit note','logging report']):
print("Credit note found:", PMC_No, Invoice_No, Basic_Amount, Debit_Amount, Final_Amount,
After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No)
cursor.execute(
"""INSERT INTO credit_note (PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Contractor_Id, invoice_no) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s)""",
( PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, subcontractor_id, Invoice_No))
#-----------------------------------------------Hold Amount----------------------------------------------------------------------
# Step 1: Normalize Invoice_Details: lowercase, trim, remove extra spaces
normalized_details = re.sub(r'\s+', ' ', Invoice_Details.strip()).lower()
# Step 2: Define lowercase keywords
keywords = [
'excess hold',
'ht',
'hold release amount',
'dpr excess hold amount',
'excess hold amount',
'Multi to Single layer bill',
'hold amount',
'logging report'
]
# Step 3: Matching condition
if any(kw in normalized_details for kw in keywords):
print("✅ Match found. Inserting hold release for:", Invoice_Details)
cursor.execute("""
INSERT INTO hold_release (PMC_No, Invoice_No, Invoice_Details, Basic_Amount,
Total_Amount, UTR, Contractor_Id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Final_Amount, UTR,
subcontractor_id
))
connection.commit() # ✅ Ensure changes are saved to DB
print("✅ Hold release inserted for:", PMC_No, Invoice_Details)
#------------------------------------------------------------------------------------------------------------------
elif Invoice_Details and any(
keyword in Invoice_Details.lower() for keyword in ['gst', 'release', 'note']):
print("Gst rels :", PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, subcontractor_id)
#cursor.callproc("SaveGSTRelease", (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR))
cursor.execute(
"""INSERT INTO gst_release (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, Contractor_Id) VALUES (%s,%s, %s, %s, %s, %s, %s)""",
(PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR, subcontractor_id))
# insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)"""
# cursor.execute(insert_payment,
# (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR))
if PMC_No and Total_Amount and UTR:
print("Payment :", PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR )
# insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)"""
# cursor.execute(insert_payment,
# (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR))
cursor.callproc("SavePayment",
(PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR ))
connection.commit()
return jsonify({"success": "Data saved successfully!"}), 200
# return render_template('uploadExcelFile.html')
except Exception as e:
connection.rollback()
return jsonify({"error": f"An unexpected error occurred: {e}"}), 500
finally:
cursor.close()
connection.close()
return render_template('index.html')
# ---------------------- Report --------------------------------
# call report page
@app.route('/report')
def report_page():
return render_template('report.html')
# Search list multiples input and search reports
@app.route('/search_contractor', methods=['POST'])
def search_contractor():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
subcontractor_name = request.form.get('subcontractor_name')
pmc_no = request.form.get('pmc_no')
state = request.form.get('state')
district = request.form.get('district')
block = request.form.get('block')
village = request.form.get('village')
year_from = request.form.get('year_from')
year_to = request.form.get('year_to')
conditions = []
params = []
LogHelper.log_action("Search contractor", f"User {current_user.id} Search contractor'{ subcontractor_name}'")
if subcontractor_name:
conditions.append("LOWER(s.Contractor_Name) LIKE LOWER(%s)")
params.append(f"%{subcontractor_name}%")
if pmc_no:
conditions.append("i.PMC_No = %s")
params.append(pmc_no)
if state:
conditions.append("LOWER(st.State_Name) LIKE LOWER(%s)")
params.append(f"%{state}%")
if district:
conditions.append("LOWER(d.District_Name) LIKE LOWER(%s)")
params.append(f"%{district}%")
if block:
conditions.append("LOWER(b.Block_Name) LIKE LOWER(%s)")
params.append(f"%{block}%")
if village:
conditions.append("LOWER(v.Village_Name) LIKE LOWER(%s)")
params.append(f"%{village}%")
if year_from and year_to:
conditions.append("i.Invoice_Date BETWEEN %s AND %s")
params.append(year_from)
params.append(year_to)
if not conditions:
return jsonify({"error": "At least one field is required for search."}), 400
# query = f"""
# SELECT DISTINCT s.Contractor_Id, s.Contractor_Name, i.PMC_No, st.State_Name,
# d.District_Name, b.Block_Name, v.Village_Name
# FROM subcontractors s
# INNER JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id
# INNER JOIN villages v ON asg.Village_Id = v.Village_Id
# INNER JOIN invoice i ON i.Village_Id = asg.Village_Id AND i.PMC_No = asg.PMC_No
# LEFT JOIN blocks b ON v.Block_Id = b.Block_Id
# LEFT JOIN districts d ON b.District_id = d.District_id
# LEFT JOIN states st ON d.State_Id = st.State_Id
# WHERE {' AND '.join(conditions)}
# ORDER BY s.Contractor_Name ASC, i.PMC_No ASC
# """
# cursor.execute(query, tuple(params))
# data = cursor.fetchall()
cursor.callproc("search_contractor_info", [
subcontractor_name or None,
pmc_no or None,
state or None,
district or None,
block or None,
village or None,
year_from or None,
year_to or None
])
for result in cursor.stored_results():
data = result.fetchall()
return jsonify(data)
@app.route('/contractor_report/<int:contractor_id>')
def contractor_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# Contractor details
cursor.execute("""
SELECT DISTINCT s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name,
s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address
FROM subcontractors s
LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id
LEFT JOIN villages v ON asg.Village_Id = v.Village_Id
LEFT JOIN blocks b ON v.Block_Id = b.Block_Id
LEFT JOIN districts d ON b.District_id = d.District_id
LEFT JOIN states st ON d.State_Id = st.State_Id
WHERE s.Contractor_Id = %s
""", (contractor_id,))
contInfo = cursor.fetchone()
# Hold types
cursor.execute("""
SELECT DISTINCT ht.hold_type_id, ht.hold_type
FROM invoice_subcontractor_hold_join h
JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
JOIN invoice i ON h.Invoice_Id = i.Invoice_Id
WHERE h.Contractor_Id = %s
""", (contractor_id,))
hold_types = cursor.fetchall()
# Invoices
cursor.execute("""
SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details,
i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount,
i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount,
i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount,
i.Final_Amount, h.hold_amount, ht.hold_type
FROM assign_subcontractors asg
INNER JOIN villages v ON asg.Village_Id = v.Village_Id
INNER JOIN invoice i ON i.Village_Id = v.Village_Id AND i.PMC_No = asg.PMC_No
LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id AND h.Contractor_Id = asg.Contractor_Id
LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE asg.Contractor_Id = %s
ORDER BY i.PMC_No ASC
""", (contractor_id,))
invoices = cursor.fetchall()
# GST Release
cursor.execute("""
SELECT gr.pmc_no, gr.invoice_no, gr.basic_amount, gr.final_amount
FROM gst_release gr
INNER JOIN (
SELECT DISTINCT i.PMC_No, i.Invoice_No
FROM invoice i
JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id
WHERE a.Contractor_Id = %s
) x ON gr.pmc_no = x.PMC_No AND gr.invoice_no = x.Invoice_No
ORDER BY gr.pmc_no ASC
""", (contractor_id,))
gst_rel = cursor.fetchall()
#Hold
# Hold Release
cursor.execute("SELECT * FROM hold_release WHERE Contractor_Id=%s", (contractor_id,))
hold_release = cursor.fetchall()
print(hold_release)
#Credit Note
cursor.execute("select * from credit_note where Contractor_Id=%s",(contractor_id,))
credit_note=cursor.fetchall()
print(credit_note)
# Payments (include valid matches and payments with pmc_no but invoice_no is NULL)
cursor.execute("""
SELECT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr
FROM payment p
WHERE
EXISTS (
SELECT 1
FROM invoice i
JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id
WHERE a.Contractor_Id = %s
AND i.PMC_No = p.pmc_no AND i.Invoice_No = p.invoice_no
)
OR (
p.invoice_no IS NULL
AND EXISTS (
SELECT 1
FROM assign_subcontractors a
WHERE a.Contractor_Id = %s
AND a.PMC_No = p.pmc_no
)
)
ORDER BY p.pmc_no ASC
""", (contractor_id, contractor_id))
payments = cursor.fetchall()
# Totals
total = {
"sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)),
"sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)),
"sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)),
"sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)),
"sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)),
"sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)),
"sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)),
"sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)),
"sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)),
"sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)),
"sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)),
"sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)),
"sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)),
"sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)),
"sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)),
"sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)),
"sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments))
}
current_date = datetime.now().strftime('%Y-%m-%d')
except Exception as e:
print(f"Error fetching contractor report: {e}")
return "An error occurred while fetching contractor report", 500
finally:
cursor.close()
connection.close()
return render_template('subcontractor_report.html',
contInfo=contInfo,
contractor_id=contractor_id,
invoices=invoices,
hold_types=hold_types,
gst_rel=gst_rel,
payments=payments,
credit_note=credit_note,
hold_release=hold_release,
total=total,
current_date=current_date)
# # Download report by contractor id
# # Download report by contractor id
class FilePathData:
downloadReportFolder = "static/download"
@app.route('/download_report/<int:contractor_id>')
def download_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
output_folder = FilePathData.downloadReportFolder
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"Contractor_Report_{contractor_id}.xlsx")
try:
# ---------------- Contractor Info ----------------
contractor = ContractorInfo(contractor_id)
contInfo = contractor.contInfo
if not contractor.contInfo:
return "No contractor found", 404
# ---------------- Hold Types ----------------
cursor.execute("""
SELECT DISTINCT ht.hold_type_id, ht.hold_type
FROM invoice_subcontractor_hold_join h
JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE h.Contractor_Id = %s
""", (contractor_id,))
hold_types = cursor.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor.execute("""
SELECT i.*, v.Village_Name
FROM assign_subcontractors asg
INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
WHERE asg.Contractor_Id = %s
ORDER BY i.PMC_No, i.Invoice_No
""", (contractor_id,))
invoices = cursor.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
# ---------------- Hold Amounts ----------------
cursor.execute("""
SELECT h.Invoice_Id, h.hold_type_id, h.hold_amount
FROM invoice_subcontractor_hold_join h
WHERE h.Contractor_Id = %s
""", (contractor_id,))
hold_amounts = cursor.fetchall()
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# ---------------- Payments ----------------
cursor.execute("""
SELECT DISTINCT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr
FROM payment p
INNER JOIN invoice i ON i.PMC_No = p.pmc_no AND i.invoice_no = p.invoice_no
INNER JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id
WHERE asg.Contractor_Id = %s
""", (contractor_id,))
payments = cursor.fetchall()
payments_map = {}
for pay in payments:
key = (str(pay['pmc_no']), str(pay['invoice_no']))
payments_map.setdefault(key, []).append(pay)
# ---------------- Extra Payments (no invoice_no) ----------------
cursor.execute("""
SELECT pmc_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr
FROM payment
WHERE (invoice_no IS NULL OR invoice_no = '')
AND Total_amount != 0
AND pmc_no IS NOT NULL
""")
extra_payments_raw = cursor.fetchall()
extra_payments_map = {}
for pay in extra_payments_raw:
extra_payments_map.setdefault(str(pay['pmc_no']), []).append({
'Payment_Amount': pay['Payment_Amount'],
'TDS_Payment_Amount': pay['TDS_Payment_Amount'],
'Total_amount': pay['Total_amount'],
'utr': pay['utr']
})
# ---------------- Credit Notes ----------------
cursor.execute("""
SELECT PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount,
GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR
FROM credit_note
WHERE Contractor_Id = %s
""", (contractor_id,))
credit_notes = cursor.fetchall()
credit_note_map = {}
for cn in credit_notes:
key = (str(cn['PMC_No']), str(cn['Invoice_No']))
credit_note_map.setdefault(key, []).append(cn)
# ---------------- GST Releases ----------------
cursor.execute("""
SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR
FROM gst_release
WHERE Contractor_Id = %s
ORDER BY PMC_No, Invoice_No
""", (contractor_id,))
gst_releases = cursor.fetchall()
gst_release_map = {}
for gr in gst_releases:
key = (str(gr['PMC_No']), str(gr['Invoice_No']))
gst_release_map.setdefault(key, []).append(gr)
# ---------------- Excel Workbook ----------------
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Contractor Report"
# Contractor Info
for field, value in contInfo.items():
sheet.append([field.replace("_", " "), value])
sheet.append([])
# Headers
base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
"Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
"SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
hold_headers = [ht['hold_type'] for ht in hold_types]
payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
all_headers = base_headers + hold_headers + payment_headers
sheet.append(all_headers)
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="ADD8E6", end_color="ADD8E6", fill_type="solid")
# ---------------- Data Rows ----------------
processed_gst_releases = set()
appended_credit_keys = set()
previous_pmc_no = None
for inv in invoices:
pmc_no = str(inv["PMC_No"])
invoice_no = str(inv["invoice_no"])
key = (pmc_no, invoice_no)
# Yellow separator if PMC_No changes
if previous_pmc_no and pmc_no != previous_pmc_no:
sheet.append([""] * len(all_headers))
yellow_fill = PatternFill(start_color="FFFF99", end_color="FFFF99", fill_type="solid")
for cell in sheet[sheet.max_row]:
cell.fill = yellow_fill
previous_pmc_no = pmc_no
# Invoice row
row = [
pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"],
inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"],
inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"],
inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
]
# Hold values
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in hold_type_map:
row.append(invoice_holds.get(ht_id, ""))
# Payment values
payment = payments_map.get(key, [None])[0]
row += [
inv["Final_Amount"],
payment["Payment_Amount"] if payment else "",
payment["TDS_Payment_Amount"] if payment else "",
payment["Total_amount"] if payment else "",
payment["utr"] if payment and payment.get("utr") else ""
]
sheet.append(row)
# ---------------- Extra Payments for this PMC ----------------
if pmc_no in extra_payments_map:
for ep in extra_payments_map[pmc_no]:
extra_row = [pmc_no] + [""] * (len(base_headers) - 1)
extra_row += [""] * len(hold_headers)
extra_row += [
"", ep["Payment_Amount"], ep["TDS_Payment_Amount"], ep["Total_amount"], ep.get("utr", "")
]
sheet.append(extra_row)
del extra_payments_map[pmc_no]
# GST Releases
if key in gst_release_map and key not in processed_gst_releases:
for gr in gst_release_map[key]:
gst_row = [
pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"],
gr["Basic_Amount"], "", "", "", "", "", "", "", "", ""
]
gst_row += ["" for _ in hold_headers]
gst_row += [gr["Final_Amount"], "", "", gr["Total_Amount"], gr.get("UTR", "")]
sheet.append(gst_row)
processed_gst_releases.add(key)
# Credit Notes
if key in credit_note_map and key not in appended_credit_keys:
for cn in credit_note_map[key]:
cn_row = [
pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""),
cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""), cn.get("After_Debit_Amount", ""),
cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "", "", ""
]
cn_row += ["" for _ in hold_headers]
cn_row += [cn.get("Final_Amount", ""), "", "", cn.get("Total_Amount", ""), cn.get("UTR", "")]
sheet.append(cn_row)
appended_credit_keys.add(key)
# ---------------- Totals ----------------
total_basic_amount = total_tds_amount = total_sd_amount = total_on_commission = 0
total_final_amount = total_total_amount = total_hold_amount = 0
start_row = 2 # skip headers
for r in sheet.iter_rows(min_row=start_row, max_row=sheet.max_row, values_only=True):
try:
total_basic_amount += float(r[6] or 0)
total_tds_amount += float(r[11] or 0)
total_sd_amount += float(r[12] or 0)
total_on_commission += float(r[13] or 0)
total_final_amount += float(r[-5] or 0)
total_total_amount += float(r[-2] or 0)
total_hold_amount += sum(float(r[i] or 0) for i in range(len(base_headers), len(base_headers) + len(hold_headers)))
except:
continue
totals_row = [
"Total", "", "", "", "", "", total_basic_amount, "", "", "", "", total_tds_amount,
total_sd_amount, total_on_commission, "", ""
]
totals_row += [total_hold_amount for _ in hold_headers]
totals_row += [total_final_amount, "", "", total_total_amount, ""]
sheet.append([])
sheet.append(totals_row)
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
# ---------------- Column Width ----------------
for col in sheet.columns:
max_length = 0
col_letter = openpyxl.utils.get_column_letter(col[0].column)
for cell in col:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
sheet.column_dimensions[col_letter].width = max_length + 2
workbook.save(output_file)
workbook.close()
finally:
cursor.close()
connection.close()
return send_from_directory(output_folder, f"Contractor_Report_{contractor_id}.xlsx", as_attachment=True)
@app.route('/pmc_report/<pmc_no>')
def pmc_report(pmc_no):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# 1. Fetch PMC info using stored procedure
# cursor.execute("""
# SELECT DISTINCT a.PMC_No, a.Village_Id, v.Village_Name, b.Block_Name,
# d.District_Name, s.State_Name, sc.Contractor_Id, sc.Contractor_Name,
# sc.Address, sc.Mobile_No, sc.PAN_No, sc.Email, sc.Gender,
# sc.GST_Registration_Type, sc.GST_No
# FROM assign_subcontractors a
# INNER JOIN villages v ON a.Village_Id = v.Village_Id
# INNER JOIN blocks b ON v.Block_Id = b.Block_Id
# INNER JOIN districts d ON b.District_id = d.District_id
# INNER JOIN states s ON d.State_Id = s.State_Id
# INNER JOIN subcontractors sc ON a.Contractor_Id = sc.Contractor_Id
# WHERE a.pmc_no = %s
# """, (pmc_no,))
# pmc_info = cursor.fetchone()
cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,))
pmc_info = next(cursor.stored_results()).fetchone()
if not pmc_info:
return "No PMC found with this number", 404
# 2. Fetch hold types using stored procedure
# cursor.execute("""
# SELECT DISTINCT ht.hold_type_id, ht.hold_type
# FROM invoice_subcontractor_hold_join h
# JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
# JOIN invoice i ON h.Invoice_Id = i.Invoice_Id
# JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No
# WHERE a.PMC_No = %s AND a.Contractor_Id = %s
# """, (pmc_no, pmc_info["Contractor_Id"]))
# hold_types = cursor.fetchall()
cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"]))
hold_types = next(cursor.stored_results()).fetchall()
hold_type_ids = [ht['hold_type_id'] for ht in hold_types]
# 3. Initialize invoice data
invoices = []
hold_amount_total = 0
# 4. Build invoice query
if hold_type_ids:
placeholders = ','.join(['%s'] * len(hold_type_ids))
query = f"""
SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details,
i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount,
i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount,
i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount,
i.Final_Amount, h.hold_amount, ht.hold_type
FROM invoice i
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No
LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id
LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE a.PMC_No = %s AND a.Contractor_Id = %s
AND (ht.hold_type_id IS NULL OR ht.hold_type_id IN ({placeholders}))
ORDER BY i.Invoice_Date, i.Invoice_No
"""
params = [pmc_no, pmc_info["Contractor_Id"]] + hold_type_ids
else:
query = """
SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details,
i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount,
i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount,
i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount
FROM invoice i
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No
WHERE a.PMC_No = %s AND a.Contractor_Id = %s
ORDER BY i.Invoice_Date, i.Invoice_No
"""
params = [pmc_no, pmc_info["Contractor_Id"]]
cursor.execute(query, params)
invoices = cursor.fetchall()
if hold_type_ids:
hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices)
# 5. Totals from invoices
total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices)
# 6. GST release
cursor.execute("""
SELECT pmc_no, invoice_no, basic_amount, final_amount
FROM gst_release
WHERE pmc_no = %s
ORDER BY invoice_no ASC
""", (pmc_no,))
gst_rel = cursor.fetchall()
# gst_rel = cursor.fetchall()
# cursor.callproc('GetGSTReleaseByPMC', [pmc_no])
#
# # Fetch results
# for result in cursor.stored_results():
# gst_rel = result.fetchall()
total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
# Hold Release Amount
cursor.execute("""select * from hold_release where pmc_no=%s""", (pmc_no,))
hold_release = cursor.fetchall()
print("All Hold Release ", hold_release)
# Credit Note
cursor.execute("select * from credit_note where pmc_no=%s", (pmc_no,))
credit_note = cursor.fetchall()
print(credit_note)
# 7. Payments
cursor.execute("""
SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr
FROM payment
WHERE pmc_no = %s
ORDER BY invoice_no ASC
""", (pmc_no,))
payments = cursor.fetchall()
# cursor.callproc('GetPaymentByPMC', [pmc_no])
#
# for result in cursor.stored_results():
# payments = result.fetchall()
total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
# 8. Final totals dictionary
totals = {
"sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
"sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
"sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
"sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
"sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
"sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
"sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
"sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
"sum_invo_final_amt": total_invo_final,
"sum_invo_hold_amt": hold_amount_total,
"sum_gst_basic_amt": total_gst_basic,
"sum_gst_final_amt": total_gst_final,
"sum_pay_payment_amt": total_pay_amount,
"sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments),
"sum_pay_total_amt": total_pay_total
}
except Exception as e:
print(f"Error fetching PMC report: {e}")
return "An error occurred while fetching PMC report", 500
finally:
cursor.close()
connection.close()
return render_template(
'pmc_report.html',
info=pmc_info,
invoices=invoices,
hold_types=hold_types,
gst_rel=gst_rel,
payments=payments,
credit_note=credit_note,
hold_release=hold_release,
total=totals
)
# # Download report by PMC No
# @app.route('/download_pmc_report/<pmc_no>')
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# output_folder = "static/download"
# output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
# cursor = connection.cursor(dictionary=True)
# try:
# # # Fetch Contractor Details using PMC No
# # cursor.execute("""
# # SELECT DISTINCT s.Contractor_Id, s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name,
# # s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address
# # FROM subcontractors s
# # LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id
# # LEFT JOIN villages v ON asg.Village_Id = v.Village_Id
# # LEFT JOIN blocks b ON v.Block_Id = b.Block_Id
# # LEFT JOIN districts d ON b.District_id = d.District_id
# # LEFT JOIN states st ON d.State_Id = st.State_Id
# # WHERE asg.PMC_No = %s
# # """, (pmc_no,))
# # contractor_info = cursor.fetchone()
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
# # Now fetch the result:
# for result in cursor.stored_results():
# contractor_info = result.fetchone()
# if not contractor_info:
# return "No contractor found for this PMC No", 404
# # # Fetch distinct hold types present for the contractor
# # cursor.execute("""
# # SELECT DISTINCT ht.hold_type_id, ht.hold_type
# # FROM invoice_subcontractor_hold_join h
# # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
# # WHERE h.Contractor_Id = %s
# # """, (contractor_info["Contractor_Id"],))
# # hold_types = cursor.fetchall()
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
# for result in cursor.stored_results():
# hold_types = result.fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# # # # Fetch Invoices & GST Releases
# # cursor.execute("""
# # SELECT DISTINCT i.Invoice_Id, i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details,
# # i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount,
# # i.After_Debit_Amount, i.GST_Amount, i.Amount, i.TDS_Amount, i.SD_Amount,
# # i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount,
# # g.pmc_no AS gst_pmc_no, g.invoice_no AS gst_invoice_no,
# # g.basic_amount AS gst_basic_amount, g.final_amount AS gst_final_amount
# # FROM invoice i
# # LEFT JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No
# # LEFT JOIN villages v ON i.Village_Id = v.Village_Id
# # LEFT JOIN gst_release g ON i.PMC_No = g.pmc_no AND i.Invoice_No = g.invoice_no
# # WHERE asg.PMC_No = %s
# # ORDER BY i.Invoice_Date, i.Invoice_No
# # """, (pmc_no,))
# # invoices = cursor.fetchall()
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
# for result in cursor.stored_results():
# invoices = result.fetchall()
# print("pmc_report invoice data:",invoices)
# # cursor.callproc('GetInvoicesAndGSTReleasesByPMC', [pmc_no])
# # for result in cursor.stored_results():
# # invoices = result.fetchall()
# # # Fetch Hold Amounts separately
# # cursor.execute("""
# # SELECT h.Invoice_Id, ht.hold_type_id, h.hold_amount
# # FROM invoice_subcontractor_hold_join h
# # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
# # WHERE h.Contractor_Id = %s
# # """, (contractor_info["Contractor_Id"],))
# # hold_amounts = cursor.fetchall()
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
# for result in cursor.stored_results():
# hold_amounts = result.fetchall()
# # Create a mapping of invoice_id to hold amounts by type
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# # # Fetch all Payments for the PMC number
# # cursor.execute("""
# # SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, UTR
# # FROM payment
# # WHERE pmc_no = %s
# # ORDER BY invoice_no
# # """, (pmc_no,))
# # all_payments = cursor.fetchall()
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
# for result in cursor.stored_results():
# all_payments = result.fetchall()
# # Organize payments by Invoice No (both regular and GST release notes)
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# key = pay['invoice_no']
# if key not in payments_map:
# payments_map[key] = []
# payments_map[key].append(pay)
# else:
# extra_payments.append(pay)
# # Create Excel workbook
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
# # Write Contractor Details
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""])
# sheet.append(
# ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ",
# "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address",
# contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]])
# sheet.append([])
# # Table Headers - include all hold types as separate columns
# base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
# "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
# "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
# hold_headers = [ht['hold_type'] for ht in hold_types]
# payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
# sheet.append(base_headers + hold_headers + payment_headers)
# seen_invoices = set()
# seen_gst_notes = set()
# processed_payments = set()
# # Process invoices
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
# # Process invoice row with first payment (if exists)
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if len(payments) > 0 else None
# # Base invoice data
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"],
# inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"],
# inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
# # Add hold amounts for each hold type
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
# # Add payment information
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
# sheet.append(row)
# if first_payment:
# payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}"
# processed_payments.add(payment_id)
# # Process GST release if exists (only if we have a matching GST record)
# if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes:
# seen_gst_notes.add(inv["gst_invoice_no"])
# # Find the payment that matches this GST release
# gst_payment = None
# for payment in payments[1:]: # Skip first payment (already used for invoice)
# if payment['invoice_no'] == inv["gst_invoice_no"]:
# gst_payment = payment
# break
# # If no payment found in the invoice's payments, check all payments
# if not gst_payment:
# gst_payments = payments_map.get(inv["gst_invoice_no"], [])
# if gst_payments:
# gst_payment = gst_payments[0]
# # GST release row
# row = [
# pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"],
# inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
# # Empty holds for GST release
# row += ["" for _ in hold_headers]
# # Add payment information
# row += [
# inv["gst_final_amount"],
# gst_payment["Payment_Amount"] if gst_payment else "",
# gst_payment["TDS_Payment_Amount"] if gst_payment else "",
# gst_payment["Total_amount"] if gst_payment else "",
# gst_payment["UTR"] if gst_payment else ""
# ]
# sheet.append(row)
# if gst_payment:
# payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}"
# processed_payments.add(payment_id)
# # Process remaining payments as extra payments
# for payment in payments[1:]:
# payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
# if payment_id not in processed_payments:
# row = [
# pmc_no, "", "", "", "", payment['invoice_no'],
# "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
# # Empty holds for extra payments
# row += ["" for _ in hold_headers]
# # Add payment information
# row += [
# "",
# payment["Payment_Amount"],
# payment["TDS_Payment_Amount"],
# payment["Total_amount"],
# payment["UTR"]
# ]
# sheet.append(row)
# processed_payments.add(payment_id)
# # Process extra payments (null invoice_no)
# for payment in extra_payments:
# payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
# if payment_id not in processed_payments:
# row = [
# pmc_no, "", "", "", "", "",
# "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
# # Empty holds for null invoice payments
# row += ["" for _ in hold_headers]
# # Add payment information
# row += [
# "",
# payment["Payment_Amount"],
# payment["TDS_Payment_Amount"],
# payment["Total_amount"],
# payment["UTR"]
# ]
# sheet.append(row)
# processed_payments.add(payment_id)
# # Calculate totals
# total_basic_amount = 0
# total_tds_amount = 0
# total_sd_amount = 0
# total_on_commission = 0
# total_hold_amount = 0
# total_final_amount = 0
# total_payment_amount = 0
# total_tds_payment_amount = 0
# total_total_paid = 0
# for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True):
# try:
# total_basic_amount += float(row[6] or 0) # Basic_Amount
# total_tds_amount += float(row[11] or 0) # TDS_Amount
# total_sd_amount += float(row[12] or 0) # SD_Amount
# total_on_commission += float(row[13] or 0) # On_Commission
# total_final_amount += float(row[-5] or 0) # Final_Amount
# total_payment_amount += float(row[-4] or 0) # Payment_Amount
# total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment
# total_total_paid += float(row[-2] or 0) # Total_Paid
# # Sum of hold amounts
# hold_start_col = len(base_headers)
# hold_end_col = hold_start_col + len(hold_headers)
# total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col))
# except (ValueError, IndexError, TypeError):
# continue
# # Append totals row
# totals_row = [
# "TOTAL", "", "", "", "", "",
# total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount,
# total_on_commission, "", "", # Empty GST SD Amount
# ]
# # Add hold totals
# totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1)
# # Add payment totals
# totals_row += [
# total_final_amount,
# total_payment_amount,
# total_tds_payment_amount,
# total_total_paid,
# "" # UTR column remains empty
# ]
# sheet.append([])
# sheet.append(totals_row)
# # Make totals row bold
# for cell in sheet[sheet.max_row]:
# cell.font = Font(bold=True)
# # Save Excel file
# workbook.save(output_file)
# workbook.close()
# finally:
# cursor.close()
# connection.close()
# return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True)
# @app.route('/download_pmc_report/<pmc_no>')
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# output_folder = "static/download"
# output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
#
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
#
# cursor = connection.cursor(dictionary=True)
#
# try:
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
#
# for result in cursor.stored_results():
# contractor_info = result.fetchone()
#
# if not contractor_info:
# return "No contractor found for this PMC No", 404
#
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
#
# for result in cursor.stored_results():
# hold_types = result.fetchall()
#
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
#
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
#
# for result in cursor.stored_results():
# invoices = result.fetchall()
# total_tds=Decimal('0.00')
# final_amount=Decimal('0.00')
# # total_hold_amount=Decimal('0.00')
# for data in invoices:
# total_tds=total_tds+data.get('TDS_Amount',Decimal('0.00'))
# final_amount=final_amount+data.get('Final_Amount',Decimal('0.00'))
#
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
#
# for result in cursor.stored_results():
# hold_amounts = result.fetchall()
#
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
#
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
#
# for result in cursor.stored_results():
# all_payments = result.fetchall()
# total_amount=Decimal('0.00')
# for d in all_payments:
# total_amount=total_amount+ d.get('Total_Amount',Decimal('0.00'))
# total_amount_paid= final_amount- total_amount;
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# key = pay['invoice_no']
# if key not in payments_map:
# payments_map[key] = []
# payments_map[key].append(pay)
# else:
# extra_payments.append(pay)
#
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
#
# # Write Contractor Details
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""])
# sheet.append(
# ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ",
# "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address",
# contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]])
# sheet.append([])
#
# # Table Headers - include all hold types as separate columns
# base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
# "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
# "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
#
# hold_headers = [ht['hold_type'] for ht in hold_types]
#
# payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
#
# sheet.append(base_headers + hold_headers + payment_headers)
#
# seen_invoices = set()
# seen_gst_notes = set()
# processed_payments = set()
#
# # Process invoices
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
#
# # Process invoice row with first payment (if exists)
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if len(payments) > 0 else None
#
# # Base invoice data
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"],
# inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"],
# inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
#
# # Add hold amounts for each hold type
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
#
# # Add payment information
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
#
# sheet.append(row)
#
# if first_payment:
# payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}"
# processed_payments.add(payment_id)
#
# # Process GST release if exists (only if we have a matching GST record)
# if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes:
# seen_gst_notes.add(inv["gst_invoice_no"])
#
# # Find the payment that matches this GST release
# gst_payment = None
# for payment in payments[1:]: # Skip first payment (already used for invoice)
# if payment['invoice_no'] == inv["gst_invoice_no"]:
# gst_payment = payment
# break
#
# # If no payment found in the invoice's payments, check all payments
# if not gst_payment:
# gst_payments = payments_map.get(inv["gst_invoice_no"], [])
# if gst_payments:
# gst_payment = gst_payments[0]
#
# # GST release row (this will be in the same row, after the invoice information)
# gst_row = [
# pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"],
# inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
#
# # Empty holds for GST release
# gst_row += ["" for _ in hold_headers]
#
# # Add GST payment information (same columns as invoice payment information)
# gst_row += [
# inv["gst_final_amount"],
# gst_payment["Payment_Amount"] if gst_payment else "",
# gst_payment["TDS_Payment_Amount"] if gst_payment else "",
# gst_payment["Total_amount"] if gst_payment else "",
# gst_payment["UTR"] if gst_payment else ""
# ]
#
# sheet.append(gst_row)
#
# if gst_payment:
# payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}"
# processed_payments.add(payment_id)
#
# # Process remaining payments as extra payments (if any)
# for payment in payments[1:]:
# payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
# if payment_id not in processed_payments:
# row = [
# pmc_no, "", "", "", "", payment['invoice_no'],
# "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
#
# # Empty holds for extra payments
# row += ["" for _ in hold_headers]
#
# # Add payment information
# row += [
# "",
# payment["Payment_Amount"],
# payment["TDS_Payment_Amount"],
# payment["Total_amount"],
# payment["UTR"]
# ]
#
# sheet.append(row)
# processed_payments.add(payment_id)
#
# # Process extra payments (null invoice_no)
# for payment in extra_payments:
# payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
# if payment_id not in processed_payments:
# row = [
# pmc_no, "", "", "", "", "",
# "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount
# ]
#
# # Empty holds for null invoice payments
# row += ["" for _ in hold_headers]
#
# # Add payment information
# row += [
# "",
# payment["Payment_Amount"],
# payment["TDS_Payment_Amount"],
# payment["Total_amount"],
# payment["UTR"]
# ]
#
# sheet.append(row)
# processed_payments.add(payment_id)
#
# # Calculate totals
# total_basic_amount = 0
# total_tds_amount = 0
# total_sd_amount = 0
# total_on_commission = 0
# total_hold_amount = 0
# total_final_amount = 0
# total_payment_amount = 0
# total_tds_payment_amount = 0
# total_total_paid = 0
#
# for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True):
# try:
# total_basic_amount += float(row[6] or 0) # Basic_Amount
# total_tds_amount += float(row[11] or 0) # TDS_Amount
# total_sd_amount += float(row[12] or 0) # SD_Amount
# total_on_commission += float(row[13] or 0) # On_Commission
# total_final_amount += float(row[-5] or 0) # Final_Amount
# total_payment_amount += float(row[-4] or 0) # Payment_Amount
# total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment
# total_total_paid += float(row[-2] or 0) # Total_Paid
#
# # Sum of hold amounts
# hold_start_col = len(base_headers)
# hold_end_col = hold_start_col + len(hold_headers)
# total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col))
# except (ValueError, IndexError, TypeError):
# continue
#
# # Append totals row
# totals_row = [
# "TOTAL", "", "", "", "", "",
# total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount,
# total_on_commission, "", "", # Empty GST SD Amount
# ]
# if hold_headers:
# totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1)
#
# # Add payment totals
# totals_row += [
# total_final_amount,
# total_payment_amount,
# total_tds_payment_amount,
# total_total_paid,
# "" # UTR column remains empty
# ]
#
# sheet.append([])
# sheet.append(totals_row)
# #new code added for small chart---summary
# total_hold_amount=Decimal('0.00')
# for d in invoices:
# total_hold_amount = total_hold_amount + d.get('SD_Amount', Decimal('0.00')) + d.get('On_Commission',
# Decimal(
# '0.00')) + d.get(
# 'Hydro_Testing', Decimal('0.00'))
# for data in hold_amounts:
# total_hold_amount = total_hold_amount + data.get('hold_amount', Decimal('0.00'))
# print("Total Hold Amount after adding the hold amount ", total_hold_amount)
#
# # Add payment information
# # Get today's date
# today_date = datetime.today().strftime('%A,%Y-%m-%d')
# # Add headers (optional)
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
# sheet.append(["Date", today_date])
# sheet.append(["Description", "Amount"])
# # Add your values
# sheet.append(["Advance/Surplus", str(total_final_amount-total_payment_amount)])
# sheet.append(["Total Hold Amount", str(total_hold_amount)])
# sheet.append(["Amount With TDS", str(total_tds_payment_amount)])
# # new coded ended here for summary chart
# # Make totals row bold
# for cell in sheet[sheet.max_row]:
# cell.font = Font(bold=True)
#
# # Save Excel file
# workbook.save(output_file)
# workbook.close()
#
# finally:
# cursor.close()
# connection.close()
#
# return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True)
# @app.route('/download_pmc_report/<pmc_no>')
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# output_folder = "static/download"
# output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
#
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
#
# cursor = connection.cursor(dictionary=True)
#
# try:
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
# contractor_info = next(cursor.stored_results()).fetchone()
#
# if not contractor_info:
# return "No contractor found for this PMC No", 404
#
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
# hold_types = next(cursor.stored_results()).fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
#
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
# invoices = next(cursor.stored_results()).fetchall()
#
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
# hold_amounts = next(cursor.stored_results()).fetchall()
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
#
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
# all_payments = next(cursor.stored_results()).fetchall()
#
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# payments_map.setdefault(pay['invoice_no'], []).append(pay)
# else:
# extra_payments.append(pay)
#
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
#
# # Write contractor header
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
# sheet.append([])
#
# base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
# "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
# "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
#
# hold_headers = [ht['hold_type'] for ht in hold_types]
# payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
# sheet.append(base_headers + hold_headers + payment_headers)
# # Style the headers
# header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
# header_font=Font(bold=True)
# for cell in sheet[sheet.max_row]:
# cell.font=header_font
# cell.fill=header_fill
#
# seen_invoices = set()
# seen_gst_notes = set()
# processed_payments = set()
#
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
#
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if payments else None
#
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"],
# inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"],
# inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
#
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
#
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
#
# sheet.append(row)
#
# if first_payment:
# processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}")
#
# if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes:
# seen_gst_notes.add(inv["gst_invoice_no"])
# gst_payment = None
# for payment in payments[1:]:
# if payment['invoice_no'] == inv["gst_invoice_no"]:
# gst_payment = payment
# break
# if not gst_payment:
# gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0]
#
# gst_row = [
# pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"],
# inv["gst_basic_amount"], "", "", "", "", "", "", "", "", ""
# ]
# gst_row += ["" for _ in hold_headers]
# gst_row += [
# inv["gst_final_amount"],
# gst_payment["Payment_Amount"] if gst_payment else "",
# gst_payment["TDS_Payment_Amount"] if gst_payment else "",
# gst_payment["Total_amount"] if gst_payment else "",
# gst_payment["UTR"] if gst_payment else ""
# ]
# sheet.append(gst_row)
# if gst_payment:
# processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}")
#
# for payment in payments[1:]:
# payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
# if payment_id not in processed_payments:
# row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10
# row += ["" for _ in hold_headers]
# row += [
# "", payment["Payment_Amount"], payment["TDS_Payment_Amount"],
# payment["Total_amount"], payment["UTR"]
# ]
# sheet.append(row)
# processed_payments.add(payment_id)
#
# for payment in extra_payments:
# row = [pmc_no, "", "", "", "", ""] + [""] * 10
# row += ["" for _ in hold_headers]
# row += [
# "", payment["Payment_Amount"], payment["TDS_Payment_Amount"],
# payment["Total_amount"], payment["UTR"]
# ]
# sheet.append(row)
#
# # Totals
# total_basic_amount = Decimal('0.00')
# total_tds_amount = Decimal('0.00')
# total_sd_amount = Decimal('0.00')
# total_on_commission = Decimal('0.00')
# total_final_amount = Decimal('0.00')
# total_payment_amount = Decimal('0.00')
# total_tds_payment_amount = Decimal('0.00')
# total_total_paid = Decimal('0.00')
# total_hold_amount_dynamic = Decimal('0.00')
#
# for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True):
# try:
# total_basic_amount += Decimal(str(row[6] or 0))
# total_tds_amount += Decimal(str(row[11] or 0))
# total_sd_amount += Decimal(str(row[12] or 0))
# total_on_commission += Decimal(str(row[13] or 0))
# total_final_amount += Decimal(str(row[-5] or 0))
# total_payment_amount += Decimal(str(row[-4] or 0))
# total_tds_payment_amount += Decimal(str(row[-3] or 0))
# total_total_paid += Decimal(str(row[-2] or 0))
#
# for i in range(len(base_headers), len(base_headers) + len(hold_headers)):
# total_hold_amount_dynamic += Decimal(str(row[i] or 0))
# except:
# continue
#
# totals_row = [
# "TOTAL", "", "", "", "", "",
# total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount,
# total_on_commission, "", ""
# ]
# totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1)
# totals_row += [
# total_final_amount,
# total_payment_amount,
# total_tds_payment_amount,
# total_total_paid,
# ""
# ]
#
# sheet.append([])
# sheet.append(totals_row)
#
# # Summary
# summary_hold = Decimal('0.00')
# for d in invoices:
# summary_hold += Decimal(str(d.get('SD_Amount', 0.00))) + Decimal(str(d.get('On_Commission', 0.00))) + Decimal(str(d.get('Hydro_Testing', 0.00)))
# for h in hold_amounts:
# summary_hold += Decimal(str(h.get('hold_amount', 0.00)))
#
# sheet.append([])
# today = datetime.today().strftime('%A, %Y-%m-%d')
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
# sheet.append(["Date", today])
# sheet.append(["Description", "Amount"])
# sheet.append(["Advance/Surplus", str(total_final_amount - total_payment_amount)])
# sheet.append(["Total Hold Amount", str(summary_hold)])
# sheet.append(["Amount With TDS", str(total_payment_amount + total_tds_payment_amount)])
#
# for cell in sheet[sheet.max_row]:
# cell.font = Font(bold=True)
#
# workbook.save(output_file)
# workbook.close()
#
# finally:
# cursor.close()
# connection.close()
#
# return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True)
@app.route('/download_pmc_report/<pmc_no>')
def download_pmc_report(pmc_no):
connection = config.get_db_connection()
output_folder = "static/download"
output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
contractor_info = next(cursor.stored_results()).fetchone()
if not contractor_info:
return "No contractor found for this PMC No", 404
cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
hold_types = next(cursor.stored_results()).fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
invoices = next(cursor.stored_results()).fetchall()
# Credit Note
# Credit Note Fetch
cursor.execute("""
SELECT PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount,
GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No
FROM credit_note
WHERE Contractor_Id = %s
""", (pmc_no,))
credit_notes = cursor.fetchall()
# Build map by (PMC_No, Invoice_No)
credit_note_map = {}
for cn in credit_notes:
key = (cn["PMC_No"], cn["Invoice_No"]) # Use correct casing!
credit_note_map.setdefault(key, []).append(cn)
# Track already appended credit notes
appended_credit_keys = set()
cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
hold_amounts = next(cursor.stored_results()).fetchall()
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
all_payments = next(cursor.stored_results()).fetchall()
payments_map = {}
extra_payments = []
for pay in all_payments:
if pay['invoice_no']:
payments_map.setdefault(pay['invoice_no'], []).append(pay)
else:
extra_payments.append(pay)
# Fetch GST Release data
cursor.execute("""
SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR
FROM gst_release
WHERE PMC_No = %s
""", (pmc_no,))
gst_releases = cursor.fetchall()
gst_release_map = {}
for gr in gst_releases:
invoice_nos = []
if gr['Invoice_No']:
cleaned = gr['Invoice_No'].replace(' ', '')
if '&' in cleaned:
invoice_nos = cleaned.split('&')
elif ',' in cleaned:
invoice_nos = cleaned.split(',')
else:
invoice_nos = [cleaned]
for inv_no in invoice_nos:
gst_release_map.setdefault(inv_no, []).append(gr)
LogHelper.log_action("Download PMC Report", f"User {current_user.id} Download PMC Report'{ pmc_no}'")
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "PMC Report"
# Write contractor header
sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
sheet.append([])
base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
"Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
"SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
hold_headers = [ht['hold_type'] for ht in hold_types]
payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
sheet.append(base_headers + hold_headers + payment_headers)
# Style the headers
header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
header_font=Font(bold=True)
for cell in sheet[sheet.max_row]:
cell.font=header_font
cell.fill=header_fill
seen_invoices = set()
seen_gst_notes = set()
processed_payments = set()
for inv in invoices:
invoice_no = inv["Invoice_No"]
payments = payments_map.get(invoice_no, [])
if invoice_no not in seen_invoices:
seen_invoices.add(invoice_no)
first_payment = payments[0] if payments else None
row = [
pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"],
inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"],
inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"],
inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
]
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in hold_type_map.keys():
row.append(invoice_holds.get(ht_id, ""))
row += [
inv["Final_Amount"],
first_payment["Payment_Amount"] if first_payment else "",
first_payment["TDS_Payment_Amount"] if first_payment else "",
first_payment["Total_amount"] if first_payment else "",
first_payment["UTR"] if first_payment else ""
]
sheet.append(row)
if first_payment:
processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}")
# if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes:
# seen_gst_notes.add(inv["gst_invoice_no"])
# gst_payment = None
# for payment in payments[1:]:
# if payment['invoice_no'] == inv["gst_invoice_no"]:
# gst_payment = payment
# break
# if not gst_payment:
# gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0]
#
# gst_row = [
# pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"],
# inv["gst_basic_amount"], "", "", "", "", "", "", "", "", ""
# ]
# gst_row += ["" for _ in hold_headers]
# gst_row += [
# inv["gst_final_amount"],
# gst_payment["Payment_Amount"] if gst_payment else "",
# gst_payment["TDS_Payment_Amount"] if gst_payment else "",
# gst_payment["Total_amount"] if gst_payment else "",
# gst_payment["UTR"] if gst_payment else ""
# ]
# sheet.append(gst_row)
# Add GST Release Note(s) for this invoice if any
if invoice_no in gst_release_map:
for gr in gst_release_map[invoice_no]:
gst_row = [
pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"],
gr["Basic_Amount"], "", "", "", "", "", "", "", "", ""
]
gst_row += ["" for _ in hold_headers]
gst_row += [
gr["Final_Amount"], "", "", gr["Total_Amount"],
gr["UTR"] if gr["UTR"] else ""
]
# ✅ Ensure proper alignment
while len(gst_row) < len(base_headers) + len(hold_headers) + len(payment_headers):
gst_row.append("")
sheet.append(gst_row)
# if gst_payment:
# processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}")
for payment in payments[1:]:
payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}"
if payment_id not in processed_payments:
row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10
row += ["" for _ in hold_headers]
row += [
"", payment["Payment_Amount"], payment["TDS_Payment_Amount"],
payment["Total_amount"], payment["UTR"]
]
sheet.append(row)
processed_payments.add(payment_id)
for payment in extra_payments:
row = [pmc_no, "", "", "", "", ""] + [""] * 10
row += ["" for _ in hold_headers]
row += [
"", payment["Payment_Amount"], payment["TDS_Payment_Amount"],
payment["Total_amount"], payment["UTR"]
]
sheet.append(row)
# Credit Note row(s)
# Track already appended credit notes
appended_credit_keys = set()
# While writing invoices
key = (pmc_no, invoice_no)
if key in credit_note_map and key not in appended_credit_keys:
for cn in credit_note_map[key]:
credit_row = [
pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""),
cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""),
cn.get("After_Debit_Amount", ""), cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "",
"", ""
]
credit_row += ["" for _ in hold_headers]
credit_row += [
cn.get("Final_Amount", ""),
cn.get("Total_Amount", ""),
cn.get("UTR", "")
]
sheet.append(credit_row)
appended_credit_keys.add(key)
# Totals
total_basic_amount = Decimal('0.00')
total_tds_amount = Decimal('0.00')
total_sd_amount = Decimal('0.00')
total_on_commission = Decimal('0.00')
total_final_amount = Decimal('0.00')
total_payment_amount = Decimal('0.00')
total_tds_payment_amount = Decimal('0.00')
total_total_paid = Decimal('0.00')
total_hold_amount_dynamic = Decimal('0.00')
for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True):
try:
total_basic_amount += Decimal(str(row[6] or 0))
total_tds_amount += Decimal(str(row[11] or 0))
total_sd_amount += Decimal(str(row[12] or 0))
total_on_commission += Decimal(str(row[13] or 0))
total_final_amount += Decimal(str(row[-5] or 0))
total_payment_amount += Decimal(str(row[-4] or 0))
total_tds_payment_amount += Decimal(str(row[-3] or 0))
total_total_paid += Decimal(str(row[-2] or 0))
for i in range(len(base_headers), len(base_headers) + len(hold_headers)):
total_hold_amount_dynamic += Decimal(str(row[i] or 0))
except:
continue
# totals_row = [
# "TOTAL", "", "", "", "", "",
# total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount,
# total_on_commission, "", ""
# ]
# if total_hold_amount_dynamic:
# totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1)
# totals_row += [
# total_final_amount,
# total_payment_amount,
# total_tds_payment_amount,
# total_total_paid,
# ""
# ]
# Prepare empty totals_row with length of base_headers
totals_row = [""] * len(base_headers)
# Fill in specific columns
totals_row[0] = "TOTAL" # Column 0: Label
totals_row[6] = total_basic_amount # Column 6: Basic Amount
totals_row[11] = total_tds_amount # Column 11: TDS
totals_row[12] = total_sd_amount # Column 12: SD
totals_row[13] = total_on_commission # Column 13: On Commission
# Add hold header totals
hold_values = ["" for _ in hold_headers]
if total_hold_amount_dynamic:
hold_values[0] = total_hold_amount_dynamic # Only in first column
totals_row += hold_values
# Add payment section
totals_row += [
total_final_amount,
total_payment_amount,
total_tds_payment_amount,
total_total_paid,
"" # UTR
]
sheet.append([])
sheet.append(totals_row)
# Summary
# summary_hold = Decimal('0.00')
# for d in invoices:
# summary_hold += Decimal(str(d.get('SD_Amount', 0))) + Decimal(str(d.get('On_Commission', 0))) + Decimal(str(d.get('Hydro_Testing', 0)))
# for h in hold_amounts:
# summary_hold += Decimal(str(h.get('hold_amount', 0)))
sheet.append([])
today = datetime.today().strftime('%A, %Y-%m-%d')
sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
sheet.append(["Date", today])
sheet.append(["Description", "Amount"])
sheet.append(["Advance/Surplus", str(total_final_amount - total_total_paid)])
sheet.append(["Total Hold Amount", str(total_hold_amount_dynamic)])
sheet.append(["Amount With TDS", str(total_tds_amount)])
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
workbook.save(output_file)
workbook.close()
finally:
cursor.close()
connection.close()
return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True)
# --------- Hold Types Controller --------------------------------------------
# Route to Add a New Hold Type
@app.route('/add_hold_type', methods=['POST', 'GET'])
def add_hold_type():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# Fetch all hold types using the stored procedure
cursor.callproc("GetAllHoldTypes")
hold_types = []
for hold in cursor.stored_results():
hold_types = hold.fetchall()
if request.method == 'POST':
hold_type = request.form.get('hold_type', '').strip()
# Validation: Must start with a letter
if not hold_type or not hold_type[0].isalpha():
return jsonify({"status": "error", "message": "Hold Type must start with a letter."}), 400
# Validation: Check if it already exists (case-insensitive)
# cursor.execute("SELECT COUNT(*) AS count FROM hold_types WHERE LOWER(hold_type) = LOWER(%s)", (hold_type,))
# if cursor.fetchone()['count'] > 0:
# return jsonify({"status": "error", "message": "This Hold Type already exists."}), 400
# Call the procedure to check if the hold_type exists
cursor.callproc('CheckHoldTypeExists', [hold_type])
try:
# Insert new hold type into the database
# cursor.execute("INSERT INTO hold_types (hold_type) VALUES (%s)", (hold_type,))
# connection.commit()
cursor.callproc('SaveHoldType', [hold_type])
connection.commit()
return jsonify({"status": "success", "message": "Hold Type added successfully!"}), 201
except mysql.connector.Error as e:
connection.rollback()
return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500
except mysql.connector.Error as e:
return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500
finally:
cursor.close()
connection.close()
return render_template('add_hold_type.html', Hold_Types_data=hold_types)
# Route to Update Hold Type
# @app.route('/update_hold_type/<int:id>', methods=['POST', 'GET'])
# def update_hold_type(id):
# # GET request: Show the form with the current hold type
# if request.method == 'GET':
# connection = config.get_db_connection()
# cursor = connection.cursor()
# # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,))
# # hold_type = cursor.fetchone()
#
# cursor.callproc("GetHoldTypesById", (id,))
# for hold in cursor.stored_results():
# hold_type = hold.fetchone()
#
# cursor.close()
# connection.close()
#
# if not hold_type:
# return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404
#
# return render_template('edit_hold_type.html', hold_type=hold_type)
#
# # POST request: Update the hold type
# if request.method == 'POST':
# new_hold_type = request.form.get('hold_type').strip()
#
# # Validation: Must start with a letter
# if not new_hold_type or not new_hold_type[0].isalpha():
# return jsonify(ResponseHandler.invalid_name('Hold Type')), 400
#
# connection = config.get_db_connection()
# cursor = connection.cursor()
#
# try:
# # Check if the hold type exists before updating
# # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,))
# # hold_type = cursor.fetchone()
# cursor.callproc("GetHoldTypesById", (id,))
# for hold in cursor.stored_results():
# hold_type = hold.fetchone()
#
# if not hold_type:
# return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404
#
# # Update the hold type
# # cursor.execute("UPDATE hold_types SET hold_type = %s WHERE hold_type_id = %s", (new_hold_type, id))
# cursor.callproc("UpdateHoldTypeById", (id,new_hold_type))
# connection.commit()
# return jsonify(ResponseHandler.update_success('Hold Type'))
#
# except mysql.connector.Error as e:
# connection.rollback()
# return jsonify(ResponseHandler.update_failure('Hold Type')), 500
# finally:
# cursor.close()
# connection.close()
@app.route('/update_hold_type/<int:id>', methods=['GET', 'POST'])
def update_hold_type(id):
connection = config.get_db_connection()
cursor = connection.cursor()
if request.method == 'GET':
cursor.callproc("GetHoldTypesById", (id,))
for hold in cursor.stored_results():
hold_type = hold.fetchone()
cursor.close()
connection.close()
if not hold_type:
flash('Hold Type not found.', 'error')
return redirect(url_for('add_hold_type'))
return render_template('edit_hold_type.html', hold_type=hold_type)
elif request.method == 'POST':
new_hold_type = request.form.get('hold_type', '').strip()
if not new_hold_type or not new_hold_type[0].isalpha():
flash('Invalid hold type name. Must start with a letter.', 'error')
return redirect(url_for('add_hold_type'))
try:
cursor.callproc("GetHoldTypesById", (id,))
for h in cursor.stored_results():
hold_type = h.fetchone()
if not hold_type:
flash('Hold Type not found.', 'error')
return redirect(url_for('add_hold_type'))
cursor.callproc("UpdateHoldTypeById", (id, new_hold_type))
connection.commit()
flash('Hold Type updated successfully!', 'success')
except mysql.connector.Error as e:
connection.rollback()
flash('Failed to update Hold Type.', 'error')
finally:
cursor.close()
connection.close()
return redirect(url_for('add_hold_type'))
# Route to Delete Hold Type
@app.route('/delete_hold_type/<int:id>', methods=['POST'])
def delete_hold_type(id):
connection = config.get_db_connection()
cursor = connection.cursor()
try:
# cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,))
# hold_type = cursor.fetchone()
cursor.callproc("GetHoldTypesById", (id,))
for hold in cursor.stored_results():
hold_type = hold.fetchone()
LogHelper.log_action("Delete hold type", f"User {current_user.id} Delete hold type'{ hold_type}'")
if not hold_type:
return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404
# Proceed with deletion
# cursor.execute("DELETE FROM hold_types WHERE hold_type_id = %s", (id,))
cursor.callproc("DeleteHoldType", (id,))
connection.commit()
return jsonify(ResponseHandler.delete_success('Hold Type'))
except mysql.connector.Error as e:
return jsonify(ResponseHandler.delete_failure('Hold Type')), 500
finally:
cursor.close()
connection.close()
@app.route('/unreleased_gst')
def unreleased_gst():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# Step 1: Fetch invoices
cursor.execute("""
SELECT
i.PMC_No, i.Invoice_No, i.GST_SD_Amount, i.Invoice_Details,
i.Basic_Amount, i.Final_Amount
FROM `invoice` i
""")
invoices = cursor.fetchall()
# Step 2: Fetch GST releases
cursor.execute("""
SELECT Invoice_No, Basic_Amount ,Final_Amount
FROM gst_release
""")
gst_releases = cursor.fetchall()
# Step 3: Lookup sets
gst_invoice_nos = {g['Invoice_No'] for g in gst_releases if g['Invoice_No']}
gst_basic_amounts = {float(g['Basic_Amount']) for g in gst_releases if g['Basic_Amount'] is not None}
# Step 4: Filter
unreleased = []
for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = float(inv.get('GST_SD_Amount') or 0.0) in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
return render_template("unreleased_gst.html", data=unreleased)
finally:
cursor.close()
connection.close()
# -- end hold types controlller --------------------
# -- end hold types controlller --------------------
# Route to display the HTML form
@app.route('/add_work_order', methods=['GET'])
def add_work_order():
# Add database connection
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed
subcontractor = cursor.fetchall()
cursor.close()
connection.close()
return render_template('add_work_order.html', subcontractor=subcontractor) # This is your HTML form page
# Route to handle form submission (from action="/submit_work_order")
@app.route('/submit_work_order', methods=['POST', 'GET'])
def submit_work_order():
vendor_name = request.form['vendor_name']
work_order_type = request.form['work_order_type']
work_order_amount = request.form['work_order_amount']
boq_amount = request.form['boq_amount']
work_done_percentage = request.form['work_done_percentage']
work_order_number = request.form['work_order_number']
gst_amount = request.form['gst_amount']
tds_amount = request.form['tds_amount']
security_deposite = request.form['security_deposite']
sd_against_gst = request.form['sd_against_gst']
final_total = request.form['final_total']
tds_of_gst = request.form['tds_of_gst']
LogHelper.log_action("Submit Work Order", f"User {current_user.id} Submit Work Order'{ work_order_type}'")
# print("Good Morning How are U")
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
# print("Good morning and how are you")
insert_query = """
INSERT INTO work_order
(vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage,work_order_number,gst_amount,tds_amount
,security_deposit,sd_against_gst,final_total,tds_of_gst)
VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s)
"""
cursor.execute(insert_query,
(vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number
, gst_amount, tds_amount, security_deposite, sd_against_gst, final_total, tds_of_gst))
connection.commit()
# ✅ Fetch all data after insert
select_query = "SELECT * FROM work_order"
cursor.execute(select_query)
wo = cursor.fetchall()
# print("The Work order data is ",wo)
print("The data from work order ", wo) # should now print the data properly
cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed
subcontractor = cursor.fetchall()
cursor.close()
connection.close()
return render_template('add_work_order.html', work_order_type=work_order_type, wo=wo, subcontractor=subcontractor)
@app.route('/delete_work_order/<int:id>', methods=['POST'])
def delete_work_order(id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.execute("DELETE FROM work_order WHERE work_order_id = %s", (id,))
connection.commit()
cursor.close()
connection.close()
LogHelper.log_action("delete Work Order", f"User {current_user.id} delete Work Order'{ id}'")
return jsonify({'success': True})
@app.route('/update_work_order/<int:id>', methods=['GET', 'POST'])
def update_work_order(id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
if request.method == 'POST':
work_order_type = request.form['work_order_type']
work_order_amount = request.form['work_order_amount']
boq_amount = request.form['boq_amount']
work_done_percentage = request.form['work_done_percentage']
work_order_number = request.form['work_order_number']
gst_amount = request.form['gst_amount']
tds_amount = request.form['tds_amount']
security_deposite = request.form['security_deposite']
sd_against_gst = request.form['sd_against_gst']
final_amount = request.form['final_amount']
tds_of_gst = request.form['tds_of_gst']
update_query = """
UPDATE work_order
SET work_order_type = %s,
work_order_amount = %s,
boq_amount = %s,
work_done_percentage = %s,
work_order_number= %s,
gst_amount = %s,
tds_amount= %s,
security_deposite= %s,
sd_against_gst=%s,
final_amount= %s,
tds_of_gst=%s
WHERE work_order_id = %s
"""
cursor.execute(update_query, (
work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number, gst_amount,
tds_amount, security_deposite, sd_against_gst, final_amount, tds_of_gst, id))
connection.commit()
cursor.close()
connection.close()
# If GET request: fetch the existing record
cursor.execute("SELECT * FROM work_order WHERE work_order_id = %s", (id,))
work_order = cursor.fetchone()
cursor.close()
connection.close()
return render_template('update_work_order.html', work_order=work_order)
# Optional: Route to show a success message
@app.route('/success')
def success():
return "Work Order Submitted Successfully!"
logging.basicConfig(level=logging.DEBUG)
@app.route('/add_purchase_order', methods=['GET', 'POST'])
def add_purchase_order():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
if request.method == 'POST':
# Fetch form fields
purchase_date = request.form.get('purchase_date')
supplier_name = request.form.get('supplier_name')
purchase_order_no = request.form.get('purchase_order_no')
item_name = request.form.get('item_name')
quantity = request.form.get('quantity')
unit = request.form.get('unit')
rate = request.form.get('rate')
amount = request.form.get('amount')
GST_Amount = request.form.get('GST_Amount')
TDS = request.form.get('TDS')
final_amount = request.form.get('final_amount')
LogHelper.log_action("Add purchase order", f"User {current_user.id} Added puirchase Order'{ purchase_order_no}'")
# Insert into database
insert_query = """
INSERT INTO purchase_order (
purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount,
GST_Amount, TDS, final_amount
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount,
GST_Amount, TDS, final_amount
))
connection.commit()
# ✅ Always fetch updated data
cursor.execute("SELECT * FROM purchase_order")
purchases = cursor.fetchall()
cursor.close()
connection.close()
return render_template('add_purchase_order.html', purchases=purchases)
# Show all purchases
@app.route('/purchase_orders')
def show_purchase_orders():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM purchase_order")
purchases = cursor.fetchall()
cursor.close()
connection.close()
return render_template('add_purchase_order.html', purchases=purchases)
# Delete purchase order
@app.route('/delete_purchase/<int:id>', methods=['POST'])
def delete_purchase(id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.execute("DELETE FROM purchase_order WHERE purchase_id = %s", (id,))
connection.commit()
cursor.close()
connection.close()
LogHelper.log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'")
return render_template(('add_purchase_order.html'))
# Edit purchase order (form + update logic)
@app.route('/update_purchase/<int:id>', methods=['GET', 'POST'])
def update_purchase(id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
if request.method == 'POST':
# ✅ Form submitted - update all fields
data = request.form
cursor.execute("""
UPDATE purchase_order
SET purchase_date = %s,
supplier_name = %s,
purchase_order_no = %s,
item_name = %s,
quantity = %s,
unit = %s,
rate = %s,
amount = %s,
GST_Amount = %s,
TDS = %s,
final_amount = %s
WHERE purchase_id = %s
""", (
data['purchase_date'], data['supplier_name'], data['purchase_order_no'], data['item_name'],
data['quantity'],
data['unit'], data['rate'], data['amount'], data['GST_Amount'], data['TDS'], data['final_amount'],
id
))
connection.commit()
cursor.close()
connection.close()
LogHelper.log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'")
return redirect(url_for('show_purchase_orders'))
# Show edit form
cursor.execute("SELECT * FROM purchase_order WHERE purchase_id = %s", (id,))
purchase = cursor.fetchone()
cursor.close()
connection.close()
return render_template('edit_purchase.html', purchase=purchase)
# SHOW all GRNs + ADD form
@app.route('/grn', methods=['GET'])
def grn_page():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
# Fetch purchase orders for dropdown
cursor.execute("SELECT purchase_id, supplier_name FROM purchase_order")
purchase_orders = cursor.fetchall()
# Fetch all GRNs to display
cursor.execute("SELECT * FROM goods_receive_note")
grns = cursor.fetchall()
print(grns)
cursor.close()
connection.close()
# Render the template with both datasets
return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns)
# ADD new GRN
@app.route('/add_grn', methods=['POST', 'GET'])
def add_grn():
data = request.form
connection = config.get_db_connection()
cursor = connection.cursor()
query = """
INSERT INTO goods_receive_note
(grn_date, purchase_id, supplier_name, item_description, received_quantity, unit, rate, amount, remarks)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
cursor.execute(query, (
data.get('grn_date'),
data.get('purchase_id'),
data.get('supplier_name'),
data.get('item_description'),
data.get('received_quantity'),
data.get('unit'),
data.get('rate'),
data.get('amount'),
data.get('remarks')
))
connection.commit()
cursor.execute("SELECT * FROM goods_receive_note")
grns = cursor.fetchall()
print(grns)
query = "select * from purchase_order"
cursor.execute(query)
purchase_orders = cursor.fetchall()
cursor.close()
connection.close()
return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns)
# UPDATE GRN
@app.route('/update_grn/<int:grn_id>', methods=['GET', 'POST'])
def update_grn(grn_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
if request.method == 'POST':
data = request.form
query = """
UPDATE goods_receive_note
SET grn_date=%s, purchase_id=%s, supplier_name=%s, item_description=%s,
received_quantity=%s, unit=%s, rate=%s, amount=%s, remarks=%s
WHERE grn_id=%s
"""
cursor.execute(query, (
data['grn_date'], data['purchase_id'], data['supplier_name'],
data['item_description'], data['received_quantity'], data['unit'],
data['rate'], data['amount'], data['remarks'], grn_id
))
connection.commit()
cursor.close()
connection.close()
return redirect(url_for('grns'))
cursor.execute("SELECT * FROM goods_receive_note WHERE grn_id = %s", (grn_id,))
grn = cursor.fetchone()
cursor.close()
connection.close()
return render_template("edit_grn.html", grn=grn)
# DELETE GRN
@app.route('/delete_grn/<int:grn_id>', methods=['POST'])
def delete_grn(grn_id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.execute("DELETE FROM goods_receive_note WHERE grn_id = %s", (grn_id,))
connection.commit()
cursor.close()
connection.close()
return render_template("grn_form.html")
@app.route('/work_order_report', methods=['GET'])
def work_order_report():
return render_template('work_order_report.html')
# ✅ Vendor Name Search (for Select2)
@app.route('/get_vendor_names')
def get_vendor_names():
query = request.args.get('q', '')
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.execute("SELECT DISTINCT vendor_name FROM work_order WHERE vendor_name LIKE %s", (f"%{query}%",))
vendors = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return jsonify(vendors)
# ✅ Work Order Number Search (with or without vendor)
@app.route('/get_work_order_numbers')
def get_work_order_numbers():
vendor = request.args.get('vendor_name', '')
query = request.args.get('q', '')
connection = config.get_db_connection()
cursor = connection.cursor()
if vendor:
cursor.execute(
"SELECT DISTINCT work_order_number FROM work_order WHERE vendor_name = %s AND work_order_number LIKE %s",
(vendor, f"%{query}%"))
else:
cursor.execute("SELECT DISTINCT work_order_number FROM work_order WHERE work_order_number LIKE %s",
(f"%{query}%",))
orders = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return jsonify(orders)
# ✅ Get Work Order Data (Filtered)
@app.route('/get_work_order_data')
def get_work_order_data():
vendor = request.args.get('vendor_name')
order_number = request.args.get('work_order_number')
query = "SELECT * FROM work_order WHERE 1=1"
params = []
if vendor:
query += " AND vendor_name = %s"
params.append(vendor)
if order_number:
query += " AND work_order_number = %s"
params.append(order_number)
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute(query, tuple(params))
data = cursor.fetchall()
cursor.close()
connection.close()
return jsonify(data)
@app.route('/download_work_order_report')
def download_work_order_report():
vendor_name = request.args.get('vendor_name')
work_order_number = request.args.get('work_order_number')
if work_order_number == "null":
work_order_number = None
query = "SELECT * FROM work_order WHERE 1=1"
params = []
if vendor_name:
query += " AND vendor_name = %s"
params.append(vendor_name)
if work_order_number:
query += " AND work_order_number = %s"
params.append(work_order_number)
conn = config.get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(query, tuple(params))
data = cursor.fetchall()
cursor.close()
conn.close()
if not data:
return "No data found for the selected filters", 404
# Convert to DataFrame
df = pd.DataFrame(data)
output_path = 'static/downloads/work_order_report.xlsx'
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df.to_excel(output_path, index=False, startrow=2) # Leave space for heading
# Load workbook for styling
wb = load_workbook(output_path)
ws = wb.active
# Add a merged title
title = "Work Order Report"
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns))
title_cell = ws.cell(row=1, column=1)
title_cell.value = title
title_cell.font = Font(size=14, bold=True, color="1F4E78")
title_cell.alignment = Alignment(horizontal="center", vertical="center")
# Style header row (row 3 because data starts from row 3)
header_font = Font(bold=True)
for col_num, column_name in enumerate(df.columns, 1):
cell = ws.cell(row=3, column=col_num)
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center")
# Optional: adjust column width
max_length = max(len(str(column_name)), 12)
ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2
wb.save(output_path)
return send_file(output_path, as_attachment=True)
@app.route('/purchase_order_report', methods=['GET'])
def purchase_order_report():
return render_template('purchase_order_report.html')
@app.route('/get_supplier_names')
def get_supplier_names():
query = request.args.get('q', '') # Get the search term from Select2
connection = config.get_db_connection()
cursor = connection.cursor()
# Fetch distinct supplier names that match the search query
cursor.execute(
"SELECT supplier_name FROM purchase_order WHERE supplier_name LIKE %s",
(f"%{query}%",)
)
suppliers = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return jsonify(suppliers)
@app.route('/get_purchase_order_numbers')
def get_purchase_order_numbers():
supplier = request.args.get('supplier_name', '')
query = request.args.get('q', '')
connection = config.get_db_connection()
cursor = connection.cursor()
if supplier:
cursor.execute("""
SELECT purchase_order_no
FROM purchase_order
WHERE supplier_name = %s AND purchase_order_no LIKE %s
""", (supplier, f"%{query}%"))
else:
cursor.execute("""
SELECT purchase_order_no
FROM purchase_order
WHERE purchase_order_no LIKE %s
""", (f"%{query}%",))
orders = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return jsonify(orders)
@app.route('/get_purchase_order_data')
def get_purchase_order_data():
supplier = request.args.get('supplier_name')
order_no = request.args.get('purchase_order_no')
query = "SELECT * FROM purchase_order WHERE 1=1"
params = []
if supplier:
query += " AND supplier_name = %s"
params.append(supplier)
if order_no:
query += " AND purchase_order_no = %s"
params.append(order_no)
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute(query, tuple(params))
data = cursor.fetchall()
cursor.close()
connection.close()
return jsonify(data)
@app.route('/download_purchase_order_report')
def download_purchase_order_report():
supplier_name = request.args.get('supplier_name')
purchase_order_no = request.args.get('purchase_order_no')
if purchase_order_no == "null":
purchase_order_no = None
LogHelper.log_action("Download purchase order", f"User {current_user.id} Download puirchase Order'{ purchase_order_no}'")
query = "SELECT * FROM purchase_order WHERE 1=1"
params = []
if supplier_name:
query += " AND supplier_name = %s"
params.append(supplier_name)
if purchase_order_no:
query += " AND purchase_order_no = %s"
params.append(purchase_order_no)
conn = config.get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(query, tuple(params))
data = cursor.fetchall()
cursor.close()
conn.close()
if not data:
return "No data found for the selected filters", 404
# Convert to DataFrame
df = pd.DataFrame(data)
output_path = 'static/downloads/purchase_order_report.xlsx'
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df.to_excel(output_path, index=False, startrow=2) # Reserve space for heading
# Load workbook for styling
wb = load_workbook(output_path)
ws = wb.active
# Add a merged title
title = "Purchase Order Report"
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns))
title_cell = ws.cell(row=1, column=1)
title_cell.value = title
title_cell.font = Font(size=14, bold=True, color="1F4E78")
title_cell.alignment = Alignment(horizontal="center", vertical="center")
# Style header row (row 3 because data starts from row 3)
header_font = Font(bold=True)
for col_num, column_name in enumerate(df.columns, 1):
cell = ws.cell(row=3, column=col_num)
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center")
max_length = max(len(str(column_name)), 12)
ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2
wb.save(output_path)
return send_file(output_path, as_attachment=True)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)