From 2e57285120467e9a00f0c43b8366ee7c2de18e95 Mon Sep 17 00:00:00 2001 From: Donald Zou Date: Sat, 10 May 2025 18:16:29 +0800 Subject: [PATCH] Moved PeerJobs to using SQLAlchemy, haven't test PostgreSQL yet --- src/dashboard.py | 194 +++++++++++++++++++++++++++-------------- src/modules/PeerJob.py | 4 +- 2 files changed, 132 insertions(+), 66 deletions(-) diff --git a/src/dashboard.py b/src/dashboard.py index 0bf67d6..60cf08e 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -71,17 +71,38 @@ Peer Jobs class PeerJobs: def __init__(self): self.Jobs: list[PeerJob] = [] - self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'), - check_same_thread=False) - self.jobdb.row_factory = sqlite3.Row - self.__createPeerJobsDatabase() + + self.engine = db.create_engine(DashboardConfig.getConnectionString('wgdashboard_job')) + self.metadata = db.MetaData() + self.peerJobTable = db.Table('PeerJobs', self.metadata, + db.Column('JobID', db.String, nullable=False, primary_key=True), + db.Column('Configuration', db.String, nullable=False), + db.Column('Peer', db.String, nullable=False), + db.Column('Field', db.String, nullable=False), + db.Column('Operator', db.String, nullable=False), + db.Column('Value', db.String, nullable=False), + db.Column('CreationDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP), nullable=False), + db.Column('ExpireDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP)), + db.Column('Action', db.String, nullable=False), + ) + self.metadata.create_all(self.engine) + + + # self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'), + # check_same_thread=False) + # self.jobdb.row_factory = sqlite3.Row + # self.__createPeerJobsDatabase() self.__getJobs() def __getJobs(self): self.Jobs.clear() - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall() + with self.engine.connect() as conn: + # jobdbCursor = self.jobdb.cursor() + # jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall() + jobs = conn.execute(self.peerJobTable.select().where( + self.peerJobTable.columns.ExpireDate == None + )).mappings().fetchall() + print(jobs) for job in jobs: self.Jobs.append(PeerJob( job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], @@ -89,10 +110,13 @@ class PeerJobs: def getAllJobs(self, configuration: str = None): if configuration is not None: - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - jobs = jobdbCursor.execute( - f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall() + with self.engine.connect() as conn: + jobs = conn.execute(self.peerJobTable.select().where( + self.peerJobTable.columns.Configuration == configuration + )).mappings().fetchall() + # jobdbCursor = self.jobdb.cursor() + # jobs = jobdbCursor.execute( + # f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall() j = [] for job in jobs: j.append(PeerJob( @@ -101,20 +125,20 @@ class PeerJobs: return j return [] - def __createPeerJobsDatabase(self): - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - - existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall() - existingTable = [t['name'] for t in existingTable] - - if "PeerJobs" not in existingTable: - jobdbCursor.execute(''' - CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL, - Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME, - ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID)) - ''') - self.jobdb.commit() + # def __createPeerJobsDatabase(self): + # with self.jobdb: + # jobdbCursor = self.jobdb.cursor() + # + # existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall() + # existingTable = [t['name'] for t in existingTable] + # + # if "PeerJobs" not in existingTable: + # jobdbCursor.execute(''' + # CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL, + # Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME, + # ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID)) + # ''') + # self.jobdb.commit() def toJson(self): return [x.toJson() for x in self.Jobs] @@ -126,57 +150,99 @@ class PeerJobs: return list(filter(lambda x: x.JobID == JobID, self.Jobs)) def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: + import traceback try: - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - if len(self.searchJobById(Job.JobID)) == 0: - jobdbCursor.execute(''' - INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?) - ''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,)) + with self.engine.begin() as conn: + # jobdbCursor = self.jobdb.cursor() + # if len(self.searchJobById(Job.JobID)) == 0: + # jobdbCursor.execute(''' + # INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?) + # ''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,)) + # JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") + # else: + # currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone() + # if currentJob is not None: + # jobdbCursor.execute(''' + # UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ? + # ''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID)) + # JobLogger.log(Job.JobID, + # Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") + # + # self.jobdb.commit() + currentJob = self.searchJobById(Job.JobID) + if len(currentJob) == 0: + conn.execute( + self.peerJobTable.insert().values( + { + "JobID": Job.JobID, + "Configuration": Job.Configuration, + "Peer": Job.Peer, + "Field": Job.Field, + "Operator": Job.Operator, + "Value": Job.Value, + "CreationDate": datetime.now(), + "ExpireDate": None, + "Action": Job.Action + } + ) + ) JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") else: - currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone() - if currentJob is not None: - jobdbCursor.execute(''' - UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ? - ''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID)) - JobLogger.log(Job.JobID, - Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") + conn.execute( + self.peerJobTable.update().values({ + "Field": Job.Field, + "Operator": Job.Operator, + "Value": Job.Value, + "Action": Job.Action + }).where(self.peerJobTable.columns.JobID == Job.JobID) + ) + JobLogger.log(Job.JobID, Message=f"Job is updated from if {currentJob[0].Field} {currentJob[0].Operator} {currentJob[0].Value} then {currentJob[0].Action}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") - self.jobdb.commit() - self.__getJobs() - return True, list( - filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID, - self.Jobs)) - except Exception as e: - return False, str(e) - - def deleteJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: - try: - if (len(str(Job.CreationDate))) == 0: - return False, "Job does not exist" - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - jobdbCursor.execute(''' - UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ? - ''', (Job.JobID,)) - self.jobdb.commit() - JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.") self.__getJobs() return True, list( filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID, self.Jobs)) + except Exception as e: + traceback.print_exc() + return False, str(e) + + def deleteJob(self, Job: PeerJob) -> tuple[bool, None] | tuple[bool, str]: + try: + if len(self.searchJobById(Job.JobID)) == 0: + return False, "Job does not exist" + with self.engine.begin() as conn: + # jobdbCursor = self.jobdb.cursor() + # jobdbCursor.execute(''' + # UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ? + # ''', (Job.JobID,)) + # self.jobdb.commit() + conn.execute( + self.peerJobTable.update().values( + { + "ExpireDate": datetime.now() + } + ).where(self.peerJobTable.columns.JobID == Job.JobID) + ) + JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.") + self.__getJobs() + return True, None except Exception as e: return False, str(e) def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]: try: - with self.jobdb: - jobdbCursor = self.jobdb.cursor() - jobdbCursor.execute(''' - UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ? - ''', (NewConfigurationName, ConfigurationName, )) - self.jobdb.commit() + with self.engine.begin() as conn: + # jobdbCursor = self.jobdb.cursor() + # jobdbCursor.execute(''' + # UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ? + # ''', (NewConfigurationName, ConfigurationName, )) + # self.jobdb.commit() + conn.execute( + self.peerJobTable.update().values({ + "Configuration": NewConfigurationName + }).where(self.peerJobTable.columns.Configuration == ConfigurationName) + ) + self.__getJobs() return True, None except Exception as e: @@ -2773,7 +2839,7 @@ def API_deletePeerScheduleJob(): job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], job['CreationDate'], job['ExpireDate'], job['Action'])) if s: - return ResponseObject(s, data=p) + return ResponseObject(s) return ResponseObject(s, message=p) @app.get(f'{APP_PREFIX}/api/getPeerScheduleJobLogs/') diff --git a/src/modules/PeerJob.py b/src/modules/PeerJob.py index 83d75c3..2800ba8 100644 --- a/src/modules/PeerJob.py +++ b/src/modules/PeerJob.py @@ -23,8 +23,8 @@ class PeerJob: "Field": self.Field, "Operator": self.Operator, "Value": self.Value, - "CreationDate": self.CreationDate, - "ExpireDate": self.ExpireDate, + "CreationDate": self.CreationDate.strftime("%Y-%m-%d %H:%M:%S"), + "ExpireDate": (self.ExpireDate.strftime("%Y-%m-%d %H:%M:%S") if self.ExpireDate is not None else None), "Action": self.Action }