From 964a6c2e3e6b6b50b5865f1ecdd39af7e200af62 Mon Sep 17 00:00:00 2001 From: Donald Zou Date: Tue, 13 May 2025 21:36:15 +0800 Subject: [PATCH] Tested with PostgreSQL and moved PeerJobLogger into its own file --- src/dashboard.py | 255 +----------------- src/modules/PeerJobLogger.py | 8 +- src/modules/PeerJobs.py | 190 +++++++++++++ .../peerJobsLogsModal.vue | 5 +- 4 files changed, 209 insertions(+), 249 deletions(-) create mode 100644 src/modules/PeerJobs.py diff --git a/src/dashboard.py b/src/dashboard.py index 58b020b..2f05480 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -25,6 +25,8 @@ from modules.PeerJob import PeerJob from modules.SystemStatus import SystemStatus from modules.PeerShareLinks import PeerShareLinks from modules.DashboardAPIKey import DashboardAPIKey +from modules.PeerJobs import PeerJobs + SystemStatus = SystemStatus() from sqlalchemy_utils import database_exists, create_database @@ -65,242 +67,7 @@ def ResponseObject(status=True, message=None, data=None, status_code = 200) -> F response.content_type = "application/json" return response -""" -Peer Jobs -""" -class PeerJobs: - def __init__(self): - self.Jobs: list[PeerJob] = [] - - self.engine = db.create_engine(DashboardConfig.getConnectionString('wgdashboard_job')) - self.metadata = db.MetaData() - self.peerJobTable = db.Table('PeerJobs', self.metadata, - db.Column('JobID', db.String, nullable=False, primary_key=True), - db.Column('Configuration', db.String, nullable=False), - db.Column('Peer', db.String, nullable=False), - db.Column('Field', db.String, nullable=False), - db.Column('Operator', db.String, nullable=False), - db.Column('Value', db.String, nullable=False), - db.Column('CreationDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP), nullable=False), - db.Column('ExpireDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP)), - db.Column('Action', db.String, nullable=False), - ) - self.metadata.create_all(self.engine) - - - # self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'), - # check_same_thread=False) - # self.jobdb.row_factory = sqlite3.Row - # self.__createPeerJobsDatabase() - self.__getJobs() - def __getJobs(self): - self.Jobs.clear() - with self.engine.connect() as conn: - # jobdbCursor = self.jobdb.cursor() - # jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall() - jobs = conn.execute(self.peerJobTable.select().where( - self.peerJobTable.columns.ExpireDate == None - )).mappings().fetchall() - print(jobs) - for job in jobs: - self.Jobs.append(PeerJob( - job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], - job['CreationDate'], job['ExpireDate'], job['Action'])) - - def getAllJobs(self, configuration: str = None): - if configuration is not None: - with self.engine.connect() as conn: - jobs = conn.execute(self.peerJobTable.select().where( - self.peerJobTable.columns.Configuration == configuration - )).mappings().fetchall() - # jobdbCursor = self.jobdb.cursor() - # jobs = jobdbCursor.execute( - # f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall() - j = [] - for job in jobs: - j.append(PeerJob( - job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], - job['CreationDate'], job['ExpireDate'], job['Action'])) - return j - return [] - - # def __createPeerJobsDatabase(self): - # with self.jobdb: - # jobdbCursor = self.jobdb.cursor() - # - # existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall() - # existingTable = [t['name'] for t in existingTable] - # - # if "PeerJobs" not in existingTable: - # jobdbCursor.execute(''' - # CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL, - # Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME, - # ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID)) - # ''') - # self.jobdb.commit() - - def toJson(self): - return [x.toJson() for x in self.Jobs] - - def searchJob(self, Configuration: str, Peer: str): - return list(filter(lambda x: x.Configuration == Configuration and x.Peer == Peer, self.Jobs)) - - def searchJobById(self, JobID): - return list(filter(lambda x: x.JobID == JobID, self.Jobs)) - - def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: - import traceback - try: - with self.engine.begin() as conn: - # jobdbCursor = self.jobdb.cursor() - # if len(self.searchJobById(Job.JobID)) == 0: - # jobdbCursor.execute(''' - # INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?) - # ''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,)) - # JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") - # else: - # currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone() - # if currentJob is not None: - # jobdbCursor.execute(''' - # UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ? - # ''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID)) - # JobLogger.log(Job.JobID, - # Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") - # - # self.jobdb.commit() - currentJob = self.searchJobById(Job.JobID) - if len(currentJob) == 0: - conn.execute( - self.peerJobTable.insert().values( - { - "JobID": Job.JobID, - "Configuration": Job.Configuration, - "Peer": Job.Peer, - "Field": Job.Field, - "Operator": Job.Operator, - "Value": Job.Value, - "CreationDate": datetime.now(), - "ExpireDate": None, - "Action": Job.Action - } - ) - ) - JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") - else: - conn.execute( - self.peerJobTable.update().values({ - "Field": Job.Field, - "Operator": Job.Operator, - "Value": Job.Value, - "Action": Job.Action - }).where(self.peerJobTable.columns.JobID == Job.JobID) - ) - JobLogger.log(Job.JobID, Message=f"Job is updated from if {currentJob[0].Field} {currentJob[0].Operator} {currentJob[0].Value} then {currentJob[0].Action}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") - - self.__getJobs() - return True, list( - filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID, - self.Jobs)) - except Exception as e: - traceback.print_exc() - return False, str(e) - - def deleteJob(self, Job: PeerJob) -> tuple[bool, None] | tuple[bool, str]: - try: - if len(self.searchJobById(Job.JobID)) == 0: - return False, "Job does not exist" - with self.engine.begin() as conn: - # jobdbCursor = self.jobdb.cursor() - # jobdbCursor.execute(''' - # UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ? - # ''', (Job.JobID,)) - # self.jobdb.commit() - conn.execute( - self.peerJobTable.update().values( - { - "ExpireDate": datetime.now() - } - ).where(self.peerJobTable.columns.JobID == Job.JobID) - ) - JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.") - self.__getJobs() - return True, None - except Exception as e: - return False, str(e) - - def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]: - try: - with self.engine.begin() as conn: - # jobdbCursor = self.jobdb.cursor() - # jobdbCursor.execute(''' - # UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ? - # ''', (NewConfigurationName, ConfigurationName, )) - # self.jobdb.commit() - conn.execute( - self.peerJobTable.update().values({ - "Configuration": NewConfigurationName - }).where(self.peerJobTable.columns.Configuration == ConfigurationName) - ) - - self.__getJobs() - return True, None - except Exception as e: - return False, str(e) - - - def runJob(self): - needToDelete = [] - self.__getJobs() - for job in self.Jobs: - c = WireguardConfigurations.get(job.Configuration) - if c is not None: - f, fp = c.searchPeer(job.Peer) - if f: - if job.Field in ["total_receive", "total_sent", "total_data"]: - s = job.Field.split("_")[1] - x: float = getattr(fp, f"total_{s}") + getattr(fp, f"cumu_{s}") - y: float = float(job.Value) - else: - x: datetime = datetime.now() - y: datetime = datetime.strptime(job.Value, "%Y-%m-%d %H:%M:%S") - runAction: bool = self.__runJob_Compare(x, y, job.Operator) - if runAction: - s = False - if job.Action == "restrict": - s = c.restrictPeers([fp.id]).get_json() - elif job.Action == "delete": - s = c.deletePeers([fp.id]).get_json() - - if s['status'] is True: - JobLogger.log(job.JobID, s["status"], - f"Peer {fp.id} from {c.Name} is successfully {job.Action}ed." - ) - needToDelete.append(job) - else: - JobLogger.log(job.JobID, s["status"], - f"Peer {fp.id} from {c.Name} failed {job.Action}ed." - ) - else: - JobLogger.log(job.JobID, False, - f"Somehow can't find this peer {job.Peer} from {c.Name} failed {job.Action}ed." - ) - else: - JobLogger.log(job.JobID, False, - f"Somehow can't find this peer {job.Peer} from {job.Configuration} failed {job.Action}ed." - ) - for j in needToDelete: - self.deleteJob(j) - - def __runJob_Compare(self, x: float | datetime, y: float | datetime, operator: str): - if operator == "eq": - return x == y - if operator == "neq": - return x != y - if operator == "lgt": - return x > y - if operator == "lst": - return x < y """ WireGuard Configuration @@ -2832,9 +2599,9 @@ def API_deletePeerScheduleJob(): configuration = WireguardConfigurations.get(job['Configuration']) if configuration is None: return ResponseObject(False, "Configuration does not exist") - f, fp = configuration.searchPeer(job['Peer']) - if not f: - return ResponseObject(False, "Peer does not exist") + # f, fp = configuration.searchPeer(job['Peer']) + # if not f: + # return ResponseObject(False, "Peer does not exist") s, p = AllPeerJobs.deleteJob(PeerJob( job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], @@ -2851,7 +2618,7 @@ def API_getPeerScheduleJobLogs(configName): requestAll = False if data is not None and data == "true": requestAll = True - return ResponseObject(data=JobLogger.getLogs(requestAll, configName)) + return ResponseObject(data=AllPeerJobs.getPeerJobLogs(configName)) ''' File Download @@ -3226,16 +2993,18 @@ def InitWireguardConfigurationsList(startup: bool = False): except WireguardConfigurations.InvalidConfigurationFileException as e: print(f"{i} have an invalid configuration file.") -AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig) -AllPeerJobs: PeerJobs = PeerJobs() -JobLogger: PeerJobLogger = PeerJobLogger(CONFIGURATION_PATH, AllPeerJobs, DashboardConfig) -DashboardLogger: DashboardLogger = DashboardLogger(CONFIGURATION_PATH, DashboardConfig) + _, app_ip = DashboardConfig.GetConfig("Server", "app_ip") _, app_port = DashboardConfig.GetConfig("Server", "app_port") _, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path") WireguardConfigurations: dict[str, WireguardConfiguration] = {} AmneziaWireguardConfigurations: dict[str, AmneziaWireguardConfiguration] = {} + +AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig) +AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations) +DashboardLogger: DashboardLogger = DashboardLogger(CONFIGURATION_PATH, DashboardConfig) + InitWireguardConfigurationsList(startup=True) def startThreads(): diff --git a/src/modules/PeerJobLogger.py b/src/modules/PeerJobLogger.py index 9ec035c..ce56660 100644 --- a/src/modules/PeerJobLogger.py +++ b/src/modules/PeerJobLogger.py @@ -1,14 +1,12 @@ """ Peer Job Logger """ -import os, uuid +import uuid import sqlalchemy as db from .Log import Log -from datetime import datetime -from sqlalchemy_utils import database_exists, create_database class PeerJobLogger: - def __init__(self, CONFIGURATION_PATH, AllPeerJobs, DashboardConfig): + def __init__(self, AllPeerJobs, DashboardConfig): self.engine = db.create_engine(DashboardConfig.getConnectionString("wgdashboard_log")) self.metadata = db.MetaData() self.jobLogTable = db.Table('JobLog', self.metadata, @@ -40,7 +38,7 @@ class PeerJobLogger: return False return True - def getLogs(self, all: bool = False, configName = None) -> list[Log]: + def getLogs(self, configName = None) -> list[Log]: logs: list[Log] = [] try: allJobs = self.AllPeerJobs.getAllJobs(configName) diff --git a/src/modules/PeerJobs.py b/src/modules/PeerJobs.py new file mode 100644 index 0000000..b1ea98c --- /dev/null +++ b/src/modules/PeerJobs.py @@ -0,0 +1,190 @@ +""" +Peer Jobs +""" +from .PeerJob import PeerJob +from .PeerJobLogger import PeerJobLogger +import sqlalchemy as db +from datetime import datetime + +class PeerJobs: + def __init__(self, DashboardConfig, WireguardConfigurations): + self.Jobs: list[PeerJob] = [] + self.engine = db.create_engine(DashboardConfig.getConnectionString('wgdashboard_job')) + self.metadata = db.MetaData() + self.peerJobTable = db.Table('PeerJobs', self.metadata, + db.Column('JobID', db.String, nullable=False, primary_key=True), + db.Column('Configuration', db.String, nullable=False), + db.Column('Peer', db.String, nullable=False), + db.Column('Field', db.String, nullable=False), + db.Column('Operator', db.String, nullable=False), + db.Column('Value', db.String, nullable=False), + db.Column('CreationDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP), nullable=False), + db.Column('ExpireDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP)), + db.Column('Action', db.String, nullable=False), + ) + self.metadata.create_all(self.engine) + self.__getJobs() + self.JobLogger: PeerJobLogger = PeerJobLogger(self, DashboardConfig) + self.WireguardConfigurations = WireguardConfigurations + + def __getJobs(self): + self.Jobs.clear() + with self.engine.connect() as conn: + jobs = conn.execute(self.peerJobTable.select().where( + self.peerJobTable.columns.ExpireDate == None + )).mappings().fetchall() + for job in jobs: + self.Jobs.append(PeerJob( + job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], + job['CreationDate'], job['ExpireDate'], job['Action'])) + + def getAllJobs(self, configuration: str = None): + if configuration is not None: + with self.engine.connect() as conn: + jobs = conn.execute(self.peerJobTable.select().where( + self.peerJobTable.columns.Configuration == configuration + )).mappings().fetchall() + j = [] + for job in jobs: + j.append(PeerJob( + job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'], + job['CreationDate'], job['ExpireDate'], job['Action'])) + return j + return [] + + def toJson(self): + return [x.toJson() for x in self.Jobs] + + def searchJob(self, Configuration: str, Peer: str): + return list(filter(lambda x: x.Configuration == Configuration and x.Peer == Peer, self.Jobs)) + + def searchJobById(self, JobID): + return list(filter(lambda x: x.JobID == JobID, self.Jobs)) + + def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]: + import traceback + try: + with self.engine.begin() as conn: + currentJob = self.searchJobById(Job.JobID) + if len(currentJob) == 0: + conn.execute( + self.peerJobTable.insert().values( + { + "JobID": Job.JobID, + "Configuration": Job.Configuration, + "Peer": Job.Peer, + "Field": Job.Field, + "Operator": Job.Operator, + "Value": Job.Value, + "CreationDate": datetime.now(), + "ExpireDate": None, + "Action": Job.Action + } + ) + ) + self.JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") + else: + conn.execute( + self.peerJobTable.update().values({ + "Field": Job.Field, + "Operator": Job.Operator, + "Value": Job.Value, + "Action": Job.Action + }).where(self.peerJobTable.columns.JobID == Job.JobID) + ) + self.JobLogger.log(Job.JobID, Message=f"Job is updated from if {currentJob[0].Field} {currentJob[0].Operator} {currentJob[0].Value} then {currentJob[0].Action}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}") + self.__getJobs() + return True, list( + filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID, + self.Jobs)) + except Exception as e: + traceback.print_exc() + return False, str(e) + + def deleteJob(self, Job: PeerJob) -> tuple[bool, None] | tuple[bool, str]: + try: + if len(self.searchJobById(Job.JobID)) == 0: + return False, "Job does not exist" + with self.engine.begin() as conn: + conn.execute( + self.peerJobTable.update().values( + { + "ExpireDate": datetime.now() + } + ).where(self.peerJobTable.columns.JobID == Job.JobID) + ) + self.JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.") + self.__getJobs() + return True, None + except Exception as e: + return False, str(e) + + def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]: + try: + with self.engine.begin() as conn: + conn.execute( + self.peerJobTable.update().values({ + "Configuration": NewConfigurationName + }).where(self.peerJobTable.columns.Configuration == ConfigurationName) + ) + self.__getJobs() + return True, None + except Exception as e: + return False, str(e) + + def getPeerJobLogs(self, configurationName): + return self.JobLogger.getLogs(configurationName) + + + def runJob(self): + needToDelete = [] + self.__getJobs() + for job in self.Jobs: + c = self.WireguardConfigurations.get(job.Configuration) + if c is not None: + f, fp = c.searchPeer(job.Peer) + if f: + if job.Field in ["total_receive", "total_sent", "total_data"]: + s = job.Field.split("_")[1] + x: float = getattr(fp, f"total_{s}") + getattr(fp, f"cumu_{s}") + y: float = float(job.Value) + else: + x: datetime = datetime.now() + y: datetime = datetime.strptime(job.Value, "%Y-%m-%d %H:%M:%S") + runAction: bool = self.__runJob_Compare(x, y, job.Operator) + if runAction: + s = False + if job.Action == "restrict": + s = c.restrictPeers([fp.id]).get_json() + elif job.Action == "delete": + s = c.deletePeers([fp.id]).get_json() + + if s['status'] is True: + self.JobLogger.log(job.JobID, s["status"], + f"Peer {fp.id} from {c.Name} is successfully {job.Action}ed." + ) + needToDelete.append(job) + else: + self.JobLogger.log(job.JobID, s["status"], + f"Peer {fp.id} from {c.Name} failed {job.Action}ed." + ) + else: + self.JobLogger.log(job.JobID, False, + f"Somehow can't find this peer {job.Peer} from {c.Name} failed {job.Action}ed." + ) + else: + self.JobLogger.log(job.JobID, False, + f"Somehow can't find this peer {job.Peer} from {job.Configuration} failed {job.Action}ed." + ) + for j in needToDelete: + self.deleteJob(j) + + def __runJob_Compare(self, x: float | datetime, y: float | datetime, operator: str): + if operator == "eq": + return x == y + if operator == "neq": + return x != y + if operator == "lgt": + return x > y + if operator == "lst": + return x < y \ No newline at end of file diff --git a/src/static/app/src/components/configurationComponents/peerJobsLogsModal.vue b/src/static/app/src/components/configurationComponents/peerJobsLogsModal.vue index 6c33150..f35e667 100644 --- a/src/static/app/src/components/configurationComponents/peerJobsLogsModal.vue +++ b/src/static/app/src/components/configurationComponents/peerJobsLogsModal.vue @@ -37,7 +37,10 @@ export default { getLogs(){ return this.data .filter(x => { - return (this.showSuccessJob && x.Status === "1") || (this.showFailedJob && x.Status === "0") + + return (this.showSuccessJob && + ["1", "true"].includes(x.Status)) || (this.showFailedJob && + ["0", "false"].includes(x.Status)) }) }, showLogs(){