Finished forgot password for clients app

This commit is contained in:
Donald Zou
2025-09-05 15:48:11 +08:00
parent 41975973dc
commit 44af7eba11
10 changed files with 401 additions and 25 deletions

View File

@@ -1,5 +1,6 @@
import datetime
import hashlib
import random
import uuid
import bcrypt
@@ -304,12 +305,46 @@ class DashboardClients:
def GetClientAssignedPeers(self, ClientID):
return self.DashboardClientsPeerAssignment.GetAssignedPeers(ClientID)
def ResetClientPassword(self, ClientID, NewPassword, ConfirmNewPassword) -> tuple[bool, str] | tuple[bool, None]:
c = self.GetClient(ClientID)
if c is None:
return False, "Client does not exist"
if NewPassword != ConfirmNewPassword:
return False, "New passwords does not match"
pwStrength, msg = ValidatePasswordStrength(NewPassword)
if not pwStrength:
return pwStrength, msg
try:
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsTable.update().values({
"TotpKeyVerified": None,
"TotpKey": pyotp.random_base32(),
"Password": bcrypt.hashpw(NewPassword.encode('utf-8'), bcrypt.gensalt()).decode("utf-8"),
}).where(
self.dashboardClientsTable.c.ClientID == ClientID
)
)
self.logger.log(Message=f"User {ClientID} reset password and TOTP")
except Exception as e:
self.logger.log(Status="false", Message=f"User {ClientID} reset password failed, reason: {str(e)}")
return False, "Reset password failed."
return True, None
def UpdateClientPassword(self, Email, CurrentPassword, NewPassword, ConfirmNewPassword):
def UpdateClientPassword(self, ClientID, CurrentPassword, NewPassword, ConfirmNewPassword) -> tuple[bool, str] | tuple[bool, None]:
c = self.GetClient(ClientID)
if c is None:
return False, "Client does not exist"
if not all([CurrentPassword, NewPassword, ConfirmNewPassword]):
return False, "Please fill in all fields"
if not self.SignIn_ValidatePassword(Email, CurrentPassword):
if not self.SignIn_ValidatePassword(c.get('Email'), CurrentPassword):
return False, "Current password does not match"
if NewPassword != ConfirmNewPassword:
@@ -324,13 +359,13 @@ class DashboardClients:
self.dashboardClientsTable.update().values({
"Password": bcrypt.hashpw(NewPassword.encode('utf-8'), bcrypt.gensalt()).decode("utf-8"),
}).where(
self.dashboardClientsTable.c.Email == Email
self.dashboardClientsTable.c.ClientID == ClientID
)
)
self.logger.log(Message=f"User {Email} updated password")
self.logger.log(Message=f"User {ClientID} updated password")
except Exception as e:
self.logger.log(Status="false", Message=f"Signed up failed, reason: {str(e)}")
return False, "Signed up failed."
self.logger.log(Status="false", Message=f"User {ClientID} update password failed, reason: {str(e)}")
return False, "Update password failed."
return True, None
def UpdateClientProfile(self, ClientID, Name):
@@ -381,16 +416,17 @@ class DashboardClients:
For WGDashboard Admin to Manage Clients
'''
def GenerateClientPasswordResetLink(self, ClientID) -> bool | str:
def GenerateClientPasswordResetToken(self, ClientID) -> bool | str:
c = self.GetClient(ClientID)
if c is None:
return False
newToken = str(uuid.uuid4())
newToken = str(random.randint(0, 999999)).zfill(6)
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsPasswordResetLinkTable.update().values({
"ExpiryDate": db.func.now()
"ExpiryDate": datetime.datetime.now()
}).where(
db.and_(
self.dashboardClientsPasswordResetLinkTable.c.ClientID == ClientID,
@@ -402,13 +438,38 @@ class DashboardClients:
self.dashboardClientsPasswordResetLinkTable.insert().values({
"ResetToken": newToken,
"ClientID": ClientID,
"CreatedDate": datetime.datetime.now(),
"ExpiryDate": datetime.datetime.now() + datetime.timedelta(minutes=30)
})
)
return newToken
def ValidateClientPasswordResetToken(self, ClientID, Token):
c = self.GetClient(ClientID)
if c is None:
return False
with self.engine.connect() as conn:
t = conn.execute(
self.dashboardClientsPasswordResetLinkTable.select().where(
self.dashboardClientsPasswordResetLinkTable.c.ClientID == ClientID,
self.dashboardClientsPasswordResetLinkTable.c.ResetToken == Token,
self.dashboardClientsPasswordResetLinkTable.c.ExpiryDate > datetime.datetime.now()
)
).mappings().fetchone()
return t is not None
def RevokeClientPasswordResetToken(self, ClientID, Token):
with self.engine.begin() as conn:
conn.execute(
self.dashboardClientsPasswordResetLinkTable.update().values({
"ExpiryDate": datetime.datetime.now()
}).where(
self.dashboardClientsPasswordResetLinkTable.c.ClientID == ClientID,
self.dashboardClientsPasswordResetLinkTable.c.ResetToken == Token
)
)
return True
def GetAssignedPeerClients(self, ConfigurationName, PeerID):
c = self.DashboardClientsPeerAssignment.GetAssignedClients(ConfigurationName, PeerID)

View File

@@ -47,7 +47,7 @@ class DashboardConfig:
"dashboard_sort": "status",
"dashboard_theme": "dark",
"dashboard_api_key": "false",
"dashboard_language": "en"
"dashboard_language": "en-US"
},
"Peers": {
"peer_global_DNS": "1.1.1.1",

View File

@@ -124,7 +124,6 @@ class DashboardOIDC:
for k in providers.keys():
if all([providers[k]['client_id'], providers[k]['client_secret'], providers[k]['issuer']]):
try:
print("Requesting " + f"{providers[k]['issuer'].strip('/')}/.well-known/openid-configuration")
oidc_config = requests.get(
f"{providers[k]['issuer'].strip('/')}/.well-known/openid-configuration",
timeout=3,
@@ -136,8 +135,9 @@ class DashboardOIDC:
'openid_configuration': oidc_config
}
self.provider_secret[k] = providers[k]['client_secret']
current_app.logger.info(f"Registered OIDC Provider: {k}")
except Exception as e:
current_app.logger.error("Failed to request OIDC config for this provider: " + providers[k]['issuer'].strip('/'), exc_info=e)
current_app.logger.error(f"Failed to register OIDC config for {k}", exc_info=e)
except Exception as e:
current_app.logger.error('Read OIDC file failed. Reason: ' + str(e))
return False

View File

@@ -35,7 +35,7 @@ class EmailSender:
def ready(self):
return all([self.Server(), self.Port(), self.Encryption(), self.Username(), self.Password(), self.SendFrom()])
def send(self, receiver, subject, body, includeAttachment = False, attachmentName = ""):
def send(self, receiver, subject, body, includeAttachment = False, attachmentName = "") -> tuple[bool, str] | tuple[bool, None]:
if self.ready():
try:
self.smtp = smtplib.SMTP(self.Server(), port=int(self.Port()))