Files
WGDashboard/src/modules/DashboardClients.py

144 lines
5.9 KiB
Python
Raw Normal View History

import hashlib
import uuid
import bcrypt
import pyotp
2025-06-02 12:04:01 +08:00
import sqlalchemy as db
from .ConnectionString import ConnectionString
from .DashboardClientsTOTP import DashboardClientsTOTP
from .Utilities import ValidatePasswordStrength
from .DashboardLogger import DashboardLogger
2025-06-02 12:04:01 +08:00
class DashboardClients:
def __init__(self):
self.logger = DashboardLogger()
2025-06-02 12:04:01 +08:00
self.engine = db.create_engine(ConnectionString("wgdashboard"))
self.metadata = db.MetaData()
2025-06-03 14:36:29 +08:00
2025-06-02 12:04:01 +08:00
self.dashboardClientsTable = db.Table(
'DashboardClients', self.metadata,
db.Column('ClientID', db.String(255), nullable=False, primary_key=True),
db.Column('Email', db.String(255), nullable=False, index=True),
db.Column('Password', db.String(500)),
db.Column('TotpKey', db.String(500)),
db.Column('TotpKeyVerified', db.Integer),
db.Column('CreatedDate',
(db.DATETIME if 'sqlite:///' in ConnectionString("wgdashboard") else db.TIMESTAMP),
server_default=db.func.now()),
db.Column('DeletedDate',
(db.DATETIME if 'sqlite:///' in ConnectionString("wgdashboard") else db.TIMESTAMP)),
extend_existing=True,
)
self.dashboardClientsInfoTable = db.Table(
'DashboardClientsInfo', self.metadata,
db.Column('ClientID', db.String(255), nullable=False, primary_key=True),
db.Column('Firstname', db.String(500)),
db.Column('Lastname', db.String(500)),
extend_existing=True,
)
self.metadata.create_all(self.engine)
self.Clients = []
self.__getClients()
2025-06-03 14:36:29 +08:00
self.DashboardClientsTOTP = DashboardClientsTOTP()
2025-06-02 12:04:01 +08:00
def __getClients(self):
with self.engine.connect() as conn:
self.Clients = conn.execute(
db.select(
self.dashboardClientsTable.c.ClientID,
self.dashboardClientsTable.c.Email,
self.dashboardClientsTable.c.CreatedDate
).where(
self.dashboardClientsTable.c.DeletedDate is None)
).mappings().fetchall()
def SignIn(self, Email, Password) -> tuple[bool, str]:
if not all([Email, Password]):
return False, "Please fill in all fields"
with self.engine.connect() as conn:
existingClient = conn.execute(
self.dashboardClientsTable.select().where(
self.dashboardClientsTable.c.Email == Email
)
).mappings().fetchone()
if existingClient:
checkPwd = bcrypt.checkpw(Password.encode("utf-8"), existingClient.get("Password").encode("utf-8"))
if checkPwd:
return True, self.DashboardClientsTOTP.GenerateToken(existingClient.get("ClientID"))
2025-06-03 03:02:06 +08:00
return False, "Email or Password is incorrect"
def SignIn_GetTotp(self, Token: str, UserProvidedTotp: str = None) -> tuple[bool, str] or tuple[bool, None, str]:
status, data = self.DashboardClientsTOTP.GetTotp(Token)
if not status:
return False, "TOTP Token is invalid"
if UserProvidedTotp is None:
if data.get('TotpKeyVerified') is None:
return True, pyotp.totp.TOTP(data.get('TotpKey')).provisioning_uri(name=data.get('Email'),
issuer_name="WGDashboard Client")
else:
totpMatched = pyotp.TOTP(data.get('TotpKey')).verify(UserProvidedTotp)
if not totpMatched:
return False, "TOTP is does not match"
if data.get('TotpKeyVerified') is None:
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsTable.update().values({
'TotpKeyVerified': 1
}).where(
self.dashboardClientsTable.c.ClientID == data.get('ClientID')
)
)
return True, None
2025-06-03 03:02:06 +08:00
def SignUp(self, Email, Password, ConfirmPassword) -> tuple[bool, str] or tuple[bool, None]:
try:
if not all([Email, Password, ConfirmPassword]):
return False, "Please fill in all fields"
if Password != ConfirmPassword:
return False, "Passwords does not match"
2025-06-02 12:04:01 +08:00
with self.engine.connect() as conn:
existingClient = conn.execute(
self.dashboardClientsTable.select().where(
self.dashboardClientsTable.c.Email == Email
)
).mappings().fetchone()
if existingClient:
return False, "Email already signed up"
2025-06-02 12:04:01 +08:00
pwStrength, msg = ValidatePasswordStrength(Password)
if not pwStrength:
return pwStrength, msg
2025-06-02 12:04:01 +08:00
with self.engine.begin() as conn:
newClientUUID = str(uuid.uuid4())
totpKey = pyotp.random_base32()
encodePassword = Password.encode('utf-8')
conn.execute(
self.dashboardClientsTable.insert().values({
"ClientID": newClientUUID,
"Email": Email,
"Password": bcrypt.hashpw(encodePassword, bcrypt.gensalt()).decode("utf-8"),
"TotpKey": totpKey
})
)
conn.execute(
self.dashboardClientsInfoTable.insert().values({
"ClientID": newClientUUID
})
)
except Exception as e:
self.logger.log(Status="false", Message=f"Signed up failed, reason: {str(e)}")
return False, "Signed up failed."
return True, None