Files
WGDashboard/src/modules/PeerJobLogger.py
2025-12-02 14:27:51 +08:00

101 lines
4.1 KiB
Python

"""
Peer Job Logger
"""
import uuid
from typing import Sequence
import sqlalchemy as db
from flask import current_app
from sqlalchemy import RowMapping
from .ConnectionString import ConnectionString
from .Log import Log
class PeerJobLogger:
def __init__(self, AllPeerJobs, DashboardConfig):
self.engine = db.create_engine(ConnectionString("wgdashboard_log"))
self.metadata = db.MetaData()
self.jobLogTable = db.Table('JobLog', self.metadata,
db.Column('LogID', db.String(255), nullable=False, primary_key=True),
db.Column('JobID', db.String(255), nullable=False),
db.Column('LogDate', (db.DATETIME if DashboardConfig.GetConfig("Database", "type")[1] == 'sqlite' else db.TIMESTAMP),
server_default=db.func.now()),
db.Column('Status', db.String(255), nullable=False),
db.Column('Message', db.Text)
)
self.logs: list[Log] = []
self.metadata.create_all(self.engine)
self.AllPeerJobs = AllPeerJobs
def log(self, JobID: str, Status: bool = True, Message: str = "") -> bool:
try:
with self.engine.begin() as conn:
conn.execute(
self.jobLogTable.insert().values(
{
"LogID": str(uuid.uuid4()),
"JobID": JobID,
"Status": Status,
"Message": Message
}
)
)
except Exception as e:
current_app.logger.error(f"Peer Job Log Error", e)
return False
return True
def getLogs(self, configName = None) -> list[Log]:
logs: list[Log] = []
try:
allJobs = self.AllPeerJobs.getAllJobs(configName)
allJobsID = [x.JobID for x in allJobs]
stmt = self.jobLogTable.select().where(self.jobLogTable.columns.JobID.in_(
allJobsID
))
with self.engine.connect() as conn:
table = conn.execute(stmt).fetchall()
for l in table:
logs.append(
Log(l.LogID, l.JobID, l.LogDate.strftime("%Y-%m-%d %H:%M:%S"), l.Status, l.Message))
except Exception as e:
current_app.logger.error(f"Getting Peer Job Log Error", e)
return logs
return logs
def getFailingJobs(self) -> Sequence[RowMapping]:
with self.engine.connect() as conn:
table = conn.execute(
db.select(
self.jobLogTable.c.JobID
).where(
(db.or_(
self.jobLogTable.c.Status == 'false',
self.jobLogTable.c.Status == 0
) if conn.dialect.name == 'sqlite' else self.jobLogTable.c.Status == 'false')
).group_by(
self.jobLogTable.c.JobID
).having(
db.func.count(
self.jobLogTable.c.JobID
) > 10
)
).mappings().fetchall()
return table
def deleteLogs(self, LogID = None, JobID = None):
with self.engine.begin() as conn:
print(f"[WGDashboard] Deleted stale logs of JobID: {JobID}")
conn.execute(
self.jobLogTable.delete().where(
db.and_(
(self.jobLogTable.c.LogID == LogID if LogID is not None else True),
(self.jobLogTable.c.JobID == JobID if JobID is not None else True),
)
)
)
def vacuum(self):
with self.engine.begin() as conn:
if conn.dialect.name == 'sqlite':
print("[WGDashboard] SQLite Vacuuming PeerJogLogs Database")
conn.execute(db.text('VACUUM;'))