From a619e7f571ac664dab3de327a32277c3a25b0904 Mon Sep 17 00:00:00 2001 From: Donald Zou Date: Tue, 1 Jul 2025 13:06:16 +0800 Subject: [PATCH] Update DashboardOIDC.py Testing more with OIDC --- src/modules/DashboardOIDC.py | 122 +++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 54 deletions(-) diff --git a/src/modules/DashboardOIDC.py b/src/modules/DashboardOIDC.py index a199eef..100ef25 100644 --- a/src/modules/DashboardOIDC.py +++ b/src/modules/DashboardOIDC.py @@ -3,6 +3,7 @@ import json import requests from jose import jwt import certifi +from flask import current_app class DashboardOIDC: @@ -29,67 +30,80 @@ class DashboardOIDC: providers = {} for k in self.providers.keys(): if all([self.providers[k]['client_id'], self.providers[k]['client_secret'], self.providers[k]['issuer']]): - providers[k] = { - 'client_id': self.providers[k]['client_id'], - 'issuer': self.providers[k]['issuer'].strip('/') - } + try: + oidc_config = requests.get( + f"{self.providers[k]['issuer'].strip('/')}/.well-known/openid-configuration", + verify=certifi.where() + ).json() + providers[k] = { + 'client_id': self.providers[k]['client_id'], + 'issuer': self.providers[k]['issuer'].strip('/') + } + except Exception as e: + current_app.logger.error("Failed to request OIDC config for this provider: " + self.providers[k]['issuer'].strip('/'), exc_info=e) return providers def VerifyToken(self, provider, code, redirect_uri): - if not all([provider, code, redirect_uri]): - return False, "" - - if provider not in self.providers.keys(): - return False, "Provider does not exist" - - provider = self.providers.get(provider) - oidc_config = requests.get( - f"{provider.get('issuer').strip('/')}/.well-known/openid-configuration", - verify=certifi.where() - - ).json() - - data = { - "grant_type": "authorization_code", - "code": code, - "redirect_uri": redirect_uri, - "client_id": provider.get('client_id'), - "client_secret": provider.get('client_secret') - } - try: - tokens = requests.post(oidc_config.get('token_endpoint'), data=data).json() - if not all([tokens.get('access_token'), tokens.get('id_token')]): - return False, tokens.get('error_description', None) - except Exception as e: - return False, str(e) - + if not all([provider, code, redirect_uri]): + return False, "" + + if provider not in self.providers.keys(): + return False, "Provider does not exist" - - id_token = tokens.get('id_token') - jwks_uri = oidc_config.get("jwks_uri") - issuer = oidc_config.get("issuer") - jwks = requests.get(jwks_uri, verify=certifi.where()).json() - print(jwks) - headers = jwt.get_unverified_header(id_token) - kid = headers["kid"] - - key = next(k for k in jwks["keys"] if k["kid"] == kid) - - payload = jwt.decode( - id_token, - key, - algorithms=[key["alg"]], - audience=provider.get('client_id'), - issuer=issuer - ) - - return True, payload + provider = self.providers.get(provider) + oidc_config = requests.get( + f"{provider.get('issuer').strip('/')}/.well-known/openid-configuration", + verify=certifi.where() + + ).json() + + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + "client_id": provider.get('client_id'), + "client_secret": provider.get('client_secret') + } + + try: + tokens = requests.post(oidc_config.get('token_endpoint'), data=data).json() + if not all([tokens.get('access_token'), tokens.get('id_token')]): + return False, tokens.get('error_description', None) + except Exception as e: + return False, str(e) + + id_token = tokens.get('id_token') + jwks_uri = oidc_config.get("jwks_uri") + issuer = oidc_config.get("issuer") + jwks = requests.get(jwks_uri, verify=certifi.where()).json() + + headers = jwt.get_unverified_header(id_token) + kid = headers["kid"] + + key = next(k for k in jwks["keys"] if k["kid"] == kid) + + payload = jwt.decode( + id_token, + key, + algorithms=[key["alg"]], + audience=provider.get('client_id'), + issuer=issuer + ) + + return True, payload + except Exception as e: + current_app.logger.error('Read OIDC file failed. Reason: ' + str(e), provider, code, redirect_uri) + return False, str(e) def ReadFile(self): decoder = json.JSONDecoder() - self.providers = decoder.decode( - open(DashboardOIDC.ConfigurationFilePath, 'r').read() - ) \ No newline at end of file + try: + self.providers = decoder.decode( + open(DashboardOIDC.ConfigurationFilePath, 'r').read() + ) + except Exception as e: + current_app.logger.error('Read OIDC file failed. Reason: ' + str(e)) + return False \ No newline at end of file