Files
WGDashboard/src/modules/DashboardClients.py
2025-08-20 15:10:02 +08:00

276 lines
12 KiB
Python

import hashlib
import uuid
import bcrypt
import pyotp
import sqlalchemy as db
import requests
from .ConnectionString import ConnectionString
from .DashboardClientsPeerAssignment import DashboardClientsPeerAssignment
from .DashboardClientsTOTP import DashboardClientsTOTP
from .DashboardOIDC import DashboardOIDC
from .Utilities import ValidatePasswordStrength
from .DashboardLogger import DashboardLogger
from flask import session
class DashboardClients:
def __init__(self, wireguardConfigurations):
self.logger = DashboardLogger()
self.engine = db.create_engine(ConnectionString("wgdashboard"))
self.metadata = db.MetaData()
self.OIDC = DashboardOIDC()
self.dashboardClientsTable = db.Table(
'DashboardClients', self.metadata,
db.Column('ClientID', db.String(255), nullable=False, primary_key=True),
db.Column('Email', db.String(255), nullable=False, index=True),
db.Column('Password', db.String(500)),
db.Column('TotpKey', db.String(500)),
db.Column('TotpKeyVerified', db.Integer),
db.Column('CreatedDate',
(db.DATETIME if 'sqlite:///' in ConnectionString("wgdashboard") else db.TIMESTAMP),
server_default=db.func.now()),
db.Column('DeletedDate',
(db.DATETIME if 'sqlite:///' in ConnectionString("wgdashboard") else db.TIMESTAMP)),
extend_existing=True,
)
self.dashboardOIDCClientsTable = db.Table(
'DashboardOIDCClients', self.metadata,
db.Column('ClientID', db.String(255), nullable=False, primary_key=True),
db.Column('Email', db.String(255), nullable=False, index=True),
db.Column('ProviderIssuer', db.String(500), nullable=False, index=True),
db.Column('ProviderSubject', db.String(500), nullable=False, index=True),
db.Column('CreatedDate',
(db.DATETIME if 'sqlite:///' in ConnectionString("wgdashboard") else db.TIMESTAMP),
server_default=db.func.now()),
extend_existing=True,
)
self.dashboardClientsInfoTable = db.Table(
'DashboardClientsInfo', self.metadata,
db.Column('ClientID', db.String(255), nullable=False, primary_key=True),
db.Column('Name', db.String(500)),
extend_existing=True,
)
self.metadata.create_all(self.engine)
self.Clients = []
self.__getClients()
self.DashboardClientsTOTP = DashboardClientsTOTP()
self.DashboardClientsPeerAssignment = DashboardClientsPeerAssignment(wireguardConfigurations)
def __getClients(self):
with self.engine.connect() as conn:
self.Clients = conn.execute(
db.select(
self.dashboardClientsTable.c.ClientID,
self.dashboardClientsTable.c.Email,
self.dashboardClientsTable.c.CreatedDate
).where(
self.dashboardClientsTable.c.DeletedDate is None)
).mappings().fetchall()
def GetClientProfile(self, ClientID):
with self.engine.connect() as conn:
return dict(conn.execute(
db.select(
*[c for c in self.dashboardClientsInfoTable.c if c.name != 'ClientID']
).where(
self.dashboardClientsInfoTable.c.ClientID == ClientID
)
).mappings().fetchone())
def SignIn_ValidatePassword(self, Email, Password) -> bool:
if not all([Email, Password]):
return False
existingClient = self.SignIn_UserExistence(Email)
if existingClient:
return bcrypt.checkpw(Password.encode("utf-8"), existingClient.get("Password").encode("utf-8"))
return False
def SignIn_UserExistence(self, Email):
with self.engine.connect() as conn:
existingClient = conn.execute(
self.dashboardClientsTable.select().where(
self.dashboardClientsTable.c.Email == Email
)
).mappings().fetchone()
return existingClient
def SignIn_OIDC_UserExistence(self, data: dict[str, str]):
with self.engine.connect() as conn:
existingClient = conn.execute(
self.dashboardOIDCClientsTable.select().where(
db.and_(
self.dashboardOIDCClientsTable.c.ProviderIssuer == data.get('iss'),
self.dashboardOIDCClientsTable.c.ProviderSubject == data.get('sub'),
)
)
).mappings().fetchone()
return existingClient
def SignUp_OIDC(self, data: dict[str, str]) -> tuple[bool, str] | tuple[bool, None]:
if not self.SignIn_OIDC_UserExistence(data):
with self.engine.begin() as conn:
newClientUUID = str(uuid.uuid4())
conn.execute(
self.dashboardOIDCClientsTable.insert().values({
"ClientID": newClientUUID,
"Email": data.get('email', ''),
"ProviderIssuer": data.get('iss', ''),
"ProviderSubject": data.get('sub', '')
})
)
conn.execute(
self.dashboardClientsInfoTable.insert().values({
"ClientID": newClientUUID,
"Name": data.get("name")
})
)
self.logger.log(Message=f"User {data.get('email', '')} from {data.get('iss', '')} signed up")
return True, newClientUUID
return False, "User already signed up"
def SignOut_OIDC(self):
sessionPayload = session.get('OIDCPayload')
status, oidc_config = self.OIDC.GetProviderConfiguration(session.get('SignInPayload').get("Provider"))
signOut = requests.get(
oidc_config.get("end_session_endpoint"),
params={
'id_token_hint': session.get('SignInPayload').get("Payload").get('sid')
}
)
return True
def SignIn_OIDC(self, **kwargs):
status, data = self.OIDC.VerifyToken(**kwargs)
if not status:
return False, "Sign in failed. Reason: " + data
existingClient = self.SignIn_OIDC_UserExistence(data)
print(data)
if not existingClient:
status, newClientUUID = self.SignUp_OIDC(data)
session['ClientID'] = newClientUUID
else:
session['ClientID'] = existingClient.get("ClientID")
session['SignInMethod'] = 'OIDC'
session['SignInPayload'] = {
"Provider": kwargs.get('provider'),
"Payload": data
}
return True, data
def SignIn(self, Email, Password) -> tuple[bool, str]:
if not all([Email, Password]):
return False, "Please fill in all fields"
existingClient = self.SignIn_UserExistence(Email)
if existingClient:
checkPwd = self.SignIn_ValidatePassword(Email, Password)
if checkPwd:
session['SignInMethod'] = 'local'
session['Email'] = Email
session['ClientID'] = existingClient.get("ClientID")
return True, self.DashboardClientsTOTP.GenerateToken(existingClient.get("ClientID"))
return False, "Email or Password is incorrect"
def SignIn_GetTotp(self, Token: str, UserProvidedTotp: str = None) -> tuple[bool, str] or tuple[bool, None, str]:
status, data = self.DashboardClientsTOTP.GetTotp(Token)
if not status:
return False, "TOTP Token is invalid"
if UserProvidedTotp is None:
if data.get('TotpKeyVerified') is None:
return True, pyotp.totp.TOTP(data.get('TotpKey')).provisioning_uri(name=data.get('Email'),
issuer_name="WGDashboard Client")
else:
totpMatched = pyotp.totp.TOTP(data.get('TotpKey')).verify(UserProvidedTotp)
if not totpMatched:
return False, "TOTP is does not match"
else:
self.DashboardClientsTOTP.RevokeToken(Token)
if data.get('TotpKeyVerified') is None:
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsTable.update().values({
'TotpKeyVerified': 1
}).where(
self.dashboardClientsTable.c.ClientID == data.get('ClientID')
)
)
return True, None
def SignUp(self, Email, Password, ConfirmPassword) -> tuple[bool, str] or tuple[bool, None]:
try:
if not all([Email, Password, ConfirmPassword]):
return False, "Please fill in all fields"
if Password != ConfirmPassword:
return False, "Passwords does not match"
existingClient = self.SignIn_UserExistence(Email)
if existingClient:
return False, "Email already signed up"
pwStrength, msg = ValidatePasswordStrength(Password)
if not pwStrength:
return pwStrength, msg
with self.engine.begin() as conn:
newClientUUID = str(uuid.uuid4())
totpKey = pyotp.random_base32()
encodePassword = Password.encode('utf-8')
conn.execute(
self.dashboardClientsTable.insert().values({
"ClientID": newClientUUID,
"Email": Email,
"Password": bcrypt.hashpw(encodePassword, bcrypt.gensalt()).decode("utf-8"),
"TotpKey": totpKey,
"AuthType": "local",
"AuthSrc": "local"
})
)
conn.execute(
self.dashboardClientsInfoTable.insert().values({
"ClientID": newClientUUID
})
)
self.logger.log(Message=f"User {Email} signed up")
except Exception as e:
self.logger.log(Status="false", Message=f"Signed up failed, reason: {str(e)}")
return False, "Signed up failed."
return True, None
def GetClientAssignedPeers(self, ClientID):
return self.DashboardClientsPeerAssignment.GetAssignedPeers(ClientID)
def UpdateClientPassword(self, Email, CurrentPassword, NewPassword, ConfirmNewPassword):
if not all([CurrentPassword, NewPassword, ConfirmNewPassword]):
return False, "Please fill in all fields"
if not self.SignIn_ValidatePassword(Email, CurrentPassword):
return False, "Current password does not match"
if NewPassword != ConfirmNewPassword:
return False, "New passwords does not match"
pwStrength, msg = ValidatePasswordStrength(NewPassword)
if not pwStrength:
return pwStrength, msg
try:
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsTable.update().values({
"Password": bcrypt.hashpw(NewPassword.encode('utf-8'), bcrypt.gensalt()).decode("utf-8"),
}).where(
self.dashboardClientsTable.c.Email == Email
)
)
self.logger.log(Message=f"User {Email} updated password")
except Exception as e:
self.logger.log(Status="false", Message=f"Signed up failed, reason: {str(e)}")
return False, "Signed up failed."
return True, None