695 lines
24 KiB
Python
695 lines
24 KiB
Python
# from flask_login import current_user
|
|
# from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
|
|
# from model.Log import LogHelper
|
|
|
|
# import config
|
|
# import re
|
|
# import mysql.connector
|
|
|
|
|
|
# # ----------------------------------------------------------
|
|
# # Mapping Class
|
|
# # ----------------------------------------------------------
|
|
# class itemCRUDMapping:
|
|
|
|
# def __init__(self, itemType):
|
|
# if itemType is ItemCRUDType.Village:
|
|
# self.name = "Village"
|
|
# elif itemType is ItemCRUDType.Block:
|
|
# self.name = "Block"
|
|
# elif itemType is ItemCRUDType.State:
|
|
# self.name = "State"
|
|
# elif itemType is ItemCRUDType.HoldType:
|
|
# self.name = "Hold Type"
|
|
# elif itemType is ItemCRUDType.Subcontractor:
|
|
# self.name = "Subcontractor"
|
|
# else:
|
|
# self.name = "Item"
|
|
|
|
|
|
# # ----------------------------------------------------------
|
|
# # Generic CRUD Class
|
|
# # ----------------------------------------------------------
|
|
# class ItemCRUD:
|
|
|
|
# def __init__(self, itemType):
|
|
# self.isSuccess = False
|
|
# self.resultMessage = ""
|
|
# self.itemCRUDType = itemType
|
|
# self.itemCRUDMapping = itemCRUDMapping(itemType)
|
|
|
|
# # ----------------------------------------------------------
|
|
# # DELETE
|
|
# # ----------------------------------------------------------
|
|
# def DeleteItem(self, request, itemID, storedprocDelete):
|
|
|
|
# connection = config.get_db_connection()
|
|
# cursor = connection.cursor()
|
|
|
|
# LogHelper.log_action(
|
|
# f"Delete {self.itemCRUDMapping.name}",
|
|
# f"User {current_user.id} deleted {self.itemCRUDMapping.name} '{itemID}'"
|
|
# )
|
|
|
|
# try:
|
|
# cursor.callproc(storedprocDelete, (itemID,))
|
|
# connection.commit()
|
|
|
|
# self.isSuccess = True
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.delete_success(self.itemCRUDMapping.name), 200
|
|
# )
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Error deleting {self.itemCRUDMapping.name}: {e}")
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500
|
|
# )
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
# # ----------------------------------------------------------
|
|
# # ADD
|
|
# # ----------------------------------------------------------
|
|
# def AddItem(self, request, parentid=None, childname=None, storedprocfetch=None, storedprocadd=None, data=None):
|
|
|
|
# connection = config.get_db_connection()
|
|
# if not connection:
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.db_connection_failure(), 500
|
|
# )
|
|
# return
|
|
|
|
# cursor = connection.cursor()
|
|
|
|
# LogHelper.log_action(
|
|
# f"Add {self.itemCRUDMapping.name}",
|
|
# f"User {current_user.id} adding '{childname if childname else (data.get('Contractor_Name') if data else '')}'"
|
|
# )
|
|
|
|
# try:
|
|
# # ======================================================
|
|
# # SUBCONTRACTOR (MULTI-FIELD)
|
|
# # ======================================================
|
|
# if data:
|
|
|
|
# # Duplicate check
|
|
# cursor.callproc(storedprocfetch, (data['Contractor_Name'],))
|
|
|
|
# existing_item = None
|
|
# for rs in cursor.stored_results():
|
|
# existing_item = rs.fetchone()
|
|
|
|
# if existing_item:
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
|
|
# )
|
|
# return
|
|
|
|
# # Insert
|
|
# cursor.callproc(storedprocadd, (
|
|
# data['Contractor_Name'],
|
|
# data['Address'],
|
|
# data['Mobile_No'],
|
|
# data['PAN_No'],
|
|
# data['Email'],
|
|
# data['Gender'],
|
|
# data['GST_Registration_Type'],
|
|
# data['GST_No'],
|
|
# data['Contractor_password']
|
|
# ))
|
|
|
|
# connection.commit()
|
|
|
|
# self.isSuccess = True
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.add_success(self.itemCRUDMapping.name), 200
|
|
# )
|
|
# return
|
|
|
|
# # ======================================================
|
|
# # NORMAL (Village / Block / State)
|
|
# # ======================================================
|
|
# if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
|
|
# )
|
|
# return
|
|
|
|
# # Duplicate check
|
|
# if parentid is None:
|
|
# cursor.callproc(storedprocfetch, (childname,))
|
|
# else:
|
|
# cursor.callproc(storedprocfetch, (childname, parentid))
|
|
|
|
# existing_item = None
|
|
# for rs in cursor.stored_results():
|
|
# existing_item = rs.fetchone()
|
|
|
|
# if existing_item:
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
|
|
# )
|
|
# return
|
|
|
|
# # Insert
|
|
# if parentid is None:
|
|
# cursor.callproc(storedprocadd, (childname,))
|
|
# else:
|
|
# cursor.callproc(storedprocadd, (childname, parentid))
|
|
|
|
# connection.commit()
|
|
|
|
# self.isSuccess = True
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
|
|
# ResponseHandler.add_success(self.itemCRUDMapping.name), 200
|
|
# )
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Database Error: {e}")
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.add_failure(self.itemCRUDMapping.name), 500
|
|
# )
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
# # ----------------------------------------------------------
|
|
# # EDIT
|
|
# # ----------------------------------------------------------
|
|
# def EditItem(self, request, childid, parentid=None, childname=None, storedprocupdate=None, data=None):
|
|
|
|
# connection = config.get_db_connection()
|
|
# cursor = connection.cursor()
|
|
|
|
# LogHelper.log_action(
|
|
# f"Edit {self.itemCRUDMapping.name}",
|
|
# f"User {current_user.id} edited '{childid}'"
|
|
# )
|
|
|
|
# try:
|
|
# # ======================================================
|
|
# # SUBCONTRACTOR (MULTI-FIELD)
|
|
# # ======================================================
|
|
# if data:
|
|
# cursor.callproc(storedprocupdate, (
|
|
# childid,
|
|
# data['Contractor_Name'],
|
|
# data['Address'],
|
|
# data['Mobile_No'],
|
|
# data['PAN_No'],
|
|
# data['Email'],
|
|
# data['Gender'],
|
|
# data['GST_Registration_Type'],
|
|
# data['GST_No'],
|
|
# data['Contractor_password']
|
|
# ))
|
|
|
|
# connection.commit()
|
|
|
|
# self.isSuccess = True
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.update_success(self.itemCRUDMapping.name), 200
|
|
# )
|
|
# return
|
|
|
|
# # ======================================================
|
|
# # NORMAL
|
|
# # ======================================================
|
|
# if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
# self.isSuccess = False
|
|
# self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name)['message']
|
|
# return
|
|
|
|
# if parentid is None:
|
|
# cursor.callproc(storedprocupdate, (childid, childname))
|
|
# else:
|
|
# cursor.callproc(storedprocupdate, (childid, parentid, childname))
|
|
|
|
# connection.commit()
|
|
|
|
# self.isSuccess = True
|
|
# self.resultMessage = ResponseHandler.update_success(self.itemCRUDMapping.name)['message']
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Error updating {self.itemCRUDMapping.name}: {e}")
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.update_failure(self.itemCRUDMapping.name), 500
|
|
# )
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
# # ----------------------------------------------------------
|
|
# # GET ALL
|
|
# # ----------------------------------------------------------
|
|
# def GetAllData(self, request, storedproc):
|
|
|
|
# data = []
|
|
# connection = config.get_db_connection()
|
|
|
|
# if not connection:
|
|
# return []
|
|
|
|
# cursor = connection.cursor()
|
|
|
|
# try:
|
|
# cursor.callproc(storedproc)
|
|
|
|
# for result in cursor.stored_results():
|
|
# data = result.fetchall()
|
|
|
|
# self.isSuccess = True
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
|
|
# self.isSuccess = False
|
|
# self.resultMessage = HtmlHelper.json_response(
|
|
# ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
|
|
# )
|
|
# return []
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
# return data
|
|
|
|
# # ----------------------------------------------------------
|
|
# # GET BY ID
|
|
# # ----------------------------------------------------------
|
|
# def GetDataByID(self, id, storedproc):
|
|
|
|
# data = None
|
|
# connection = config.get_db_connection()
|
|
# cursor = connection.cursor()
|
|
|
|
# try:
|
|
# cursor.callproc(storedproc, (id,))
|
|
|
|
# for rs in cursor.stored_results():
|
|
# data = rs.fetchone()
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
# return data
|
|
|
|
# # ----------------------------------------------------------
|
|
# # CHECK ITEM
|
|
# # ----------------------------------------------------------
|
|
# def CheckItem(self, request, parentid, childname, storedprocfetch):
|
|
|
|
# connection = config.get_db_connection()
|
|
# cursor = connection.cursor()
|
|
|
|
# LogHelper.log_action(
|
|
# f"Check {self.itemCRUDMapping.name}",
|
|
# f"User {current_user.id} checked '{childname}'"
|
|
# )
|
|
|
|
# if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
# return HtmlHelper.json_response(
|
|
# ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
|
|
# )
|
|
|
|
# try:
|
|
# if parentid is None:
|
|
# cursor.callproc(storedprocfetch, (childname,))
|
|
# else:
|
|
# cursor.callproc(storedprocfetch, (childname, parentid))
|
|
|
|
# existing_item = None
|
|
# for rs in cursor.stored_results():
|
|
# existing_item = rs.fetchone()
|
|
|
|
# if existing_item:
|
|
# return HtmlHelper.json_response(
|
|
# ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
|
|
# )
|
|
|
|
# return HtmlHelper.json_response(
|
|
# ResponseHandler.is_available(self.itemCRUDMapping.name), 200
|
|
# )
|
|
|
|
# except mysql.connector.Error as e:
|
|
# print(f"Error checking {self.itemCRUDMapping.name}: {e}")
|
|
# return HtmlHelper.json_response(
|
|
# ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
|
|
# )
|
|
|
|
# finally:
|
|
# cursor.close()
|
|
# connection.close()
|
|
|
|
|
|
from flask_login import current_user
|
|
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
|
|
from model.Log import LogHelper
|
|
|
|
import config
|
|
import re
|
|
import mysql.connector
|
|
|
|
|
|
# ----------------------------------------------------------
|
|
# Mapping Class
|
|
# ----------------------------------------------------------
|
|
class itemCRUDMapping:
|
|
|
|
def __init__(self, itemType):
|
|
if itemType is ItemCRUDType.Village:
|
|
self.name = "Village"
|
|
elif itemType is ItemCRUDType.Block:
|
|
self.name = "Block"
|
|
elif itemType is ItemCRUDType.State:
|
|
self.name = "State"
|
|
elif itemType is ItemCRUDType.HoldType:
|
|
self.name = "Hold Type"
|
|
elif itemType is ItemCRUDType.Subcontractor:
|
|
self.name = "Subcontractor"
|
|
else:
|
|
self.name = "Item"
|
|
|
|
|
|
# ----------------------------------------------------------
|
|
# Generic CRUD Class
|
|
# ----------------------------------------------------------
|
|
class ItemCRUD:
|
|
|
|
def __init__(self, itemType):
|
|
self.isSuccess = False
|
|
self.resultMessage = ""
|
|
self.response = {} # ✅ ADDED
|
|
self.itemCRUDType = itemType
|
|
self.itemCRUDMapping = itemCRUDMapping(itemType)
|
|
|
|
# ----------------------------------------------------------
|
|
# DELETE
|
|
# ----------------------------------------------------------
|
|
def DeleteItem(self, request, itemID, storedprocDelete):
|
|
|
|
connection = config.get_db_connection()
|
|
cursor = connection.cursor()
|
|
|
|
LogHelper.log_action(
|
|
f"Delete {self.itemCRUDMapping.name}",
|
|
f"User {current_user.id} deleted {self.itemCRUDMapping.name} '{itemID}'"
|
|
)
|
|
|
|
try:
|
|
cursor.callproc(storedprocDelete, (itemID,))
|
|
connection.commit()
|
|
|
|
self.isSuccess = True
|
|
self.response = ResponseHandler.delete_success(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Error deleting {self.itemCRUDMapping.name}: {e}")
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.delete_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
# ----------------------------------------------------------
|
|
# ADD
|
|
# ----------------------------------------------------------
|
|
def AddItem(self, request, parentid=None, childname=None, storedprocfetch=None, storedprocadd=None, data=None):
|
|
|
|
connection = config.get_db_connection()
|
|
if not connection:
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.add_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
cursor = connection.cursor()
|
|
|
|
LogHelper.log_action(
|
|
f"Add {self.itemCRUDMapping.name}",
|
|
f"User {current_user.id} adding '{childname if childname else (data.get('Contractor_Name') if data else '')}'"
|
|
)
|
|
|
|
try:
|
|
# SUBCONTRACTOR
|
|
if data:
|
|
cursor.callproc(storedprocfetch, (data['Contractor_Name'],))
|
|
|
|
existing_item = None
|
|
for rs in cursor.stored_results():
|
|
existing_item = rs.fetchone()
|
|
|
|
if existing_item:
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.already_exists(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
cursor.callproc(storedprocadd, (
|
|
data['Contractor_Name'],
|
|
data['Address'],
|
|
data['Mobile_No'],
|
|
data['PAN_No'],
|
|
data['Email'],
|
|
data['Gender'],
|
|
data['GST_Registration_Type'],
|
|
data['GST_No'],
|
|
data['Contractor_password']
|
|
))
|
|
|
|
connection.commit()
|
|
|
|
self.isSuccess = True
|
|
self.response = ResponseHandler.add_success(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
# NORMAL
|
|
if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.invalid_name(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
if parentid is None:
|
|
cursor.callproc(storedprocfetch, (childname,))
|
|
else:
|
|
cursor.callproc(storedprocfetch, (childname, parentid))
|
|
|
|
existing_item = None
|
|
for rs in cursor.stored_results():
|
|
existing_item = rs.fetchone()
|
|
|
|
if existing_item:
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.already_exists(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
if parentid is None:
|
|
cursor.callproc(storedprocadd, (childname,))
|
|
else:
|
|
cursor.callproc(storedprocadd, (childname, parentid))
|
|
|
|
connection.commit()
|
|
|
|
self.isSuccess = True
|
|
self.response = ResponseHandler.add_success(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Database Error: {e}")
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.add_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
# ----------------------------------------------------------
|
|
# EDIT
|
|
# ----------------------------------------------------------
|
|
def EditItem(self, request, childid, parentid=None, childname=None, storedprocupdate=None, data=None):
|
|
|
|
connection = config.get_db_connection()
|
|
cursor = connection.cursor()
|
|
|
|
LogHelper.log_action(
|
|
f"Edit {self.itemCRUDMapping.name}",
|
|
f"User {current_user.id} edited '{childid}'"
|
|
)
|
|
|
|
try:
|
|
if data:
|
|
cursor.callproc(storedprocupdate, (
|
|
childid,
|
|
data['Contractor_Name'],
|
|
data['Address'],
|
|
data['Mobile_No'],
|
|
data['PAN_No'],
|
|
data['Email'],
|
|
data['Gender'],
|
|
data['GST_Registration_Type'],
|
|
data['GST_No'],
|
|
data['Contractor_password']
|
|
))
|
|
|
|
connection.commit()
|
|
|
|
self.isSuccess = True
|
|
self.response = ResponseHandler.update_success(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.update_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return
|
|
|
|
if parentid is None:
|
|
cursor.callproc(storedprocupdate, (childid, childname))
|
|
else:
|
|
cursor.callproc(storedprocupdate, (childid, parentid, childname))
|
|
|
|
connection.commit()
|
|
|
|
self.isSuccess = True
|
|
self.response = ResponseHandler.update_success(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Error updating {self.itemCRUDMapping.name}: {e}")
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.update_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
# ----------------------------------------------------------
|
|
# GET ALL
|
|
# ----------------------------------------------------------
|
|
def GetAllData(self, request, storedproc):
|
|
|
|
data = []
|
|
connection = config.get_db_connection()
|
|
|
|
if not connection:
|
|
return []
|
|
|
|
cursor = connection.cursor()
|
|
|
|
try:
|
|
cursor.callproc(storedproc)
|
|
|
|
for result in cursor.stored_results():
|
|
data = result.fetchall()
|
|
|
|
self.isSuccess = True
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
|
|
self.isSuccess = False
|
|
self.response = ResponseHandler.fetch_failure(self.itemCRUDMapping.name)
|
|
self.resultMessage = self.response["message"]
|
|
return []
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return data
|
|
|
|
# ----------------------------------------------------------
|
|
# GET BY ID
|
|
# ----------------------------------------------------------
|
|
def GetDataByID(self, id, storedproc):
|
|
|
|
data = None
|
|
connection = config.get_db_connection()
|
|
cursor = connection.cursor()
|
|
|
|
try:
|
|
cursor.callproc(storedproc, (id,))
|
|
|
|
for rs in cursor.stored_results():
|
|
data = rs.fetchone()
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return data
|
|
|
|
# ----------------------------------------------------------
|
|
# CHECK ITEM (KEEP AS IS - API USE)
|
|
# ----------------------------------------------------------
|
|
def CheckItem(self, request, parentid, childname, storedprocfetch):
|
|
|
|
connection = config.get_db_connection()
|
|
cursor = connection.cursor()
|
|
|
|
LogHelper.log_action(
|
|
f"Check {self.itemCRUDMapping.name}",
|
|
f"User {current_user.id} checked '{childname}'"
|
|
)
|
|
|
|
if not re.match(RegEx.patternAlphabetOnly, childname):
|
|
return HtmlHelper.json_response(
|
|
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
|
|
)
|
|
|
|
try:
|
|
if parentid is None:
|
|
cursor.callproc(storedprocfetch, (childname,))
|
|
else:
|
|
cursor.callproc(storedprocfetch, (childname, parentid))
|
|
|
|
existing_item = None
|
|
for rs in cursor.stored_results():
|
|
existing_item = rs.fetchone()
|
|
|
|
if existing_item:
|
|
return HtmlHelper.json_response(
|
|
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
|
|
)
|
|
|
|
return HtmlHelper.json_response(
|
|
ResponseHandler.is_available(self.itemCRUDMapping.name), 200
|
|
)
|
|
|
|
except mysql.connector.Error as e:
|
|
print(f"Error checking {self.itemCRUDMapping.name}: {e}")
|
|
return HtmlHelper.json_response(
|
|
ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
|
|
)
|
|
|
|
finally:
|
|
cursor.close()
|
|
connection.close() |