Moved PeerJobs to using SQLAlchemy, haven't test PostgreSQL yet

This commit is contained in:
Donald Zou 2025-05-10 18:16:29 +08:00
parent 2784059a0f
commit 2e57285120
2 changed files with 132 additions and 66 deletions

View File

@ -71,17 +71,38 @@ Peer Jobs
class PeerJobs: class PeerJobs:
def __init__(self): def __init__(self):
self.Jobs: list[PeerJob] = [] self.Jobs: list[PeerJob] = []
self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'),
check_same_thread=False) self.engine = db.create_engine(DashboardConfig.getConnectionString('wgdashboard_job'))
self.jobdb.row_factory = sqlite3.Row self.metadata = db.MetaData()
self.__createPeerJobsDatabase() self.peerJobTable = db.Table('PeerJobs', self.metadata,
db.Column('JobID', db.String, nullable=False, primary_key=True),
db.Column('Configuration', db.String, nullable=False),
db.Column('Peer', db.String, nullable=False),
db.Column('Field', db.String, nullable=False),
db.Column('Operator', db.String, nullable=False),
db.Column('Value', db.String, nullable=False),
db.Column('CreationDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP), nullable=False),
db.Column('ExpireDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP)),
db.Column('Action', db.String, nullable=False),
)
self.metadata.create_all(self.engine)
# self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'),
# check_same_thread=False)
# self.jobdb.row_factory = sqlite3.Row
# self.__createPeerJobsDatabase()
self.__getJobs() self.__getJobs()
def __getJobs(self): def __getJobs(self):
self.Jobs.clear() self.Jobs.clear()
with self.jobdb: with self.engine.connect() as conn:
jobdbCursor = self.jobdb.cursor() # jobdbCursor = self.jobdb.cursor()
jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall() # jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall()
jobs = conn.execute(self.peerJobTable.select().where(
self.peerJobTable.columns.ExpireDate == None
)).mappings().fetchall()
print(jobs)
for job in jobs: for job in jobs:
self.Jobs.append(PeerJob( self.Jobs.append(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
@ -89,10 +110,13 @@ class PeerJobs:
def getAllJobs(self, configuration: str = None): def getAllJobs(self, configuration: str = None):
if configuration is not None: if configuration is not None:
with self.jobdb: with self.engine.connect() as conn:
jobdbCursor = self.jobdb.cursor() jobs = conn.execute(self.peerJobTable.select().where(
jobs = jobdbCursor.execute( self.peerJobTable.columns.Configuration == configuration
f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall() )).mappings().fetchall()
# jobdbCursor = self.jobdb.cursor()
# jobs = jobdbCursor.execute(
# f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall()
j = [] j = []
for job in jobs: for job in jobs:
j.append(PeerJob( j.append(PeerJob(
@ -101,20 +125,20 @@ class PeerJobs:
return j return j
return [] return []
def __createPeerJobsDatabase(self): # def __createPeerJobsDatabase(self):
with self.jobdb: # with self.jobdb:
jobdbCursor = self.jobdb.cursor() # jobdbCursor = self.jobdb.cursor()
#
existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall() # existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
existingTable = [t['name'] for t in existingTable] # existingTable = [t['name'] for t in existingTable]
#
if "PeerJobs" not in existingTable: # if "PeerJobs" not in existingTable:
jobdbCursor.execute(''' # jobdbCursor.execute('''
CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL, # CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL,
Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME, # Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME,
ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID)) # ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID))
''') # ''')
self.jobdb.commit() # self.jobdb.commit()
def toJson(self): def toJson(self):
return [x.toJson() for x in self.Jobs] return [x.toJson() for x in self.Jobs]
@ -126,57 +150,99 @@ class PeerJobs:
return list(filter(lambda x: x.JobID == JobID, self.Jobs)) return list(filter(lambda x: x.JobID == JobID, self.Jobs))
def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]:
import traceback
try: try:
with self.jobdb: with self.engine.begin() as conn:
jobdbCursor = self.jobdb.cursor() # jobdbCursor = self.jobdb.cursor()
if len(self.searchJobById(Job.JobID)) == 0: # if len(self.searchJobById(Job.JobID)) == 0:
jobdbCursor.execute(''' # jobdbCursor.execute('''
INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?) # INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?)
''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,)) # ''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,))
# JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
# else:
# currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone()
# if currentJob is not None:
# jobdbCursor.execute('''
# UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ?
# ''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID))
# JobLogger.log(Job.JobID,
# Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
#
# self.jobdb.commit()
currentJob = self.searchJobById(Job.JobID)
if len(currentJob) == 0:
conn.execute(
self.peerJobTable.insert().values(
{
"JobID": Job.JobID,
"Configuration": Job.Configuration,
"Peer": Job.Peer,
"Field": Job.Field,
"Operator": Job.Operator,
"Value": Job.Value,
"CreationDate": datetime.now(),
"ExpireDate": None,
"Action": Job.Action
}
)
)
JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
else: else:
currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone() conn.execute(
if currentJob is not None: self.peerJobTable.update().values({
jobdbCursor.execute(''' "Field": Job.Field,
UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ? "Operator": Job.Operator,
''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID)) "Value": Job.Value,
JobLogger.log(Job.JobID, "Action": Job.Action
Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") }).where(self.peerJobTable.columns.JobID == Job.JobID)
)
JobLogger.log(Job.JobID, Message=f"Job is updated from if {currentJob[0].Field} {currentJob[0].Operator} {currentJob[0].Value} then {currentJob[0].Action}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
self.jobdb.commit()
self.__getJobs() self.__getJobs()
return True, list( return True, list(
filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID, filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID,
self.Jobs)) self.Jobs))
except Exception as e: except Exception as e:
traceback.print_exc()
return False, str(e) return False, str(e)
def deleteJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: def deleteJob(self, Job: PeerJob) -> tuple[bool, None] | tuple[bool, str]:
try: try:
if (len(str(Job.CreationDate))) == 0: if len(self.searchJobById(Job.JobID)) == 0:
return False, "Job does not exist" return False, "Job does not exist"
with self.jobdb: with self.engine.begin() as conn:
jobdbCursor = self.jobdb.cursor() # jobdbCursor = self.jobdb.cursor()
jobdbCursor.execute(''' # jobdbCursor.execute('''
UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ? # UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ?
''', (Job.JobID,)) # ''', (Job.JobID,))
self.jobdb.commit() # self.jobdb.commit()
conn.execute(
self.peerJobTable.update().values(
{
"ExpireDate": datetime.now()
}
).where(self.peerJobTable.columns.JobID == Job.JobID)
)
JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.") JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.")
self.__getJobs() self.__getJobs()
return True, list( return True, None
filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID,
self.Jobs))
except Exception as e: except Exception as e:
return False, str(e) return False, str(e)
def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]: def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]:
try: try:
with self.jobdb: with self.engine.begin() as conn:
jobdbCursor = self.jobdb.cursor() # jobdbCursor = self.jobdb.cursor()
jobdbCursor.execute(''' # jobdbCursor.execute('''
UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ? # UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ?
''', (NewConfigurationName, ConfigurationName, )) # ''', (NewConfigurationName, ConfigurationName, ))
self.jobdb.commit() # self.jobdb.commit()
conn.execute(
self.peerJobTable.update().values({
"Configuration": NewConfigurationName
}).where(self.peerJobTable.columns.Configuration == ConfigurationName)
)
self.__getJobs() self.__getJobs()
return True, None return True, None
except Exception as e: except Exception as e:
@ -2773,7 +2839,7 @@ def API_deletePeerScheduleJob():
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action'])) job['CreationDate'], job['ExpireDate'], job['Action']))
if s: if s:
return ResponseObject(s, data=p) return ResponseObject(s)
return ResponseObject(s, message=p) return ResponseObject(s, message=p)
@app.get(f'{APP_PREFIX}/api/getPeerScheduleJobLogs/<configName>') @app.get(f'{APP_PREFIX}/api/getPeerScheduleJobLogs/<configName>')

View File

@ -23,8 +23,8 @@ class PeerJob:
"Field": self.Field, "Field": self.Field,
"Operator": self.Operator, "Operator": self.Operator,
"Value": self.Value, "Value": self.Value,
"CreationDate": self.CreationDate, "CreationDate": self.CreationDate.strftime("%Y-%m-%d %H:%M:%S"),
"ExpireDate": self.ExpireDate, "ExpireDate": (self.ExpireDate.strftime("%Y-%m-%d %H:%M:%S") if self.ExpireDate is not None else None),
"Action": self.Action "Action": self.Action
} }