import config from flask_login import current_user from model.Log import LogHelper from services.Generalservice import GeneralUse from model.FolderAndFile import FolderAndFile from model.Report import ReportHelper class PmcReport: data=[] # @staticmethod # def get_pmc_report(pmc_no): # connection = config.get_db_connection() # cursor = connection.cursor(dictionary=True, buffered=True) # try: # pmc_info = GeneralUse.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True) # cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"])) # hold_types = next(cursor.stored_results()).fetchall() # # Extract hold_type_ids # hold_type_ids = [ht['hold_type_id'] for ht in hold_types] # invoices = [] # hold_type_ids_str = ",".join(map(str, hold_type_ids)) # cursor.callproc('GetInvoices_WithHold',[pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]) # for result in cursor.stored_results(): # invoices = result.fetchall() # gst_rel = GeneralUse.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no]) # hold_release = GeneralUse.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no]) # credit_note = GeneralUse.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no]) # payments = GeneralUse.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no]) # totals = { # "sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices), # "sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices), # "sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices), # "sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices), # "sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices), # "sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices), # "sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices), # "sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices), # "sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices), # "sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices), # "sum_invo_final_amt": sum(row.get('Final_Amount', 0) or 0 for row in invoices), # "sum_invo_hold_amt": sum(row.get('hold_amount', 0) or 0 for row in invoices), # "sum_gst_basic_amt": sum(row.get('basic_amount', 0) or 0 for row in gst_rel), # "sum_gst_final_amt": sum(row.get('final_amount', 0) or 0 for row in gst_rel), # "sum_pay_payment_amt": sum(row.get('Payment_Amount', 0) or 0 for row in payments), # "sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments), # "sum_pay_total_amt": sum(row.get('Total_amount', 0) or 0 for row in payments) # } # return { # "info": pmc_info, # "invoices": invoices, # "hold_types": hold_types, # "gst_rel": gst_rel, # "payments": payments, # "credit_note": credit_note, # "hold_release": hold_release, # "total": totals # } # finally: # cursor.close() # connection.close() # @staticmethod # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # if not connection: # return None # cursor = connection.cursor(dictionary=True) # try: # filename = f"PMC_Report_{pmc_no}.xlsx" # output_folder = FolderAndFile.get_download_folder() # output_file = FolderAndFile.get_download_path(filename) # contractor_info = GeneralUse.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no]) # contractor_info = contractor_info[0] if contractor_info else None # print("contractor_info:::",contractor_info) # if not contractor_info: # return None # hold_types = GeneralUse.execute_sp(cursor, 'GetHoldTypesByContractor',[contractor_info["Contractor_Id"]]) # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # invoices = GeneralUse.execute_sp(cursor, 'GetInvoicesByContractorOrPMCNo', [None,pmc_no]) # credit_notes = GeneralUse.execute_sp(cursor, 'NewGetCreditNotesByPMCNo', [pmc_no]) # credit_note_map = {} # for cn in credit_notes: # key = (str(cn['PMC_No']).strip()) # credit_note_map.setdefault(key, []).append(cn) # hold_amounts = GeneralUse.execute_sp(cursor, 'GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # gst_releases = GeneralUse.execute_sp(cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no]) # gst_release_map = {} # for gr in gst_releases: # key = (str(gr['PMC_No']).strip()) # gst_release_map.setdefault(key, []).append(gr) # # ================= DATA MAPPING ================= # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # ================= LOG ================= # LogHelper.log_action("Download PMC Report",f"User {current_user.id} Download PMC Report '{pmc_no}'") # ReportHelper.generate_excel( # 0, contractor_info, invoices, hold_types, hold_data, # credit_note_map,gst_release_map, output_file) # return output_folder, filename # finally: # cursor.close() # connection.close()