import os import json import requests from jose import jwt import certifi class DashboardOIDC: ConfigurationPath = os.getenv('CONFIGURATION_PATH', '.') ConfigurationFilePath = os.path.join(ConfigurationPath, 'wg-dashboard-oidc-providers.json') def __init__(self): self.providers: dict[str, dict] = None self.__default = { 'Provider': { 'client_id': '', 'client_secret': '', 'issuer': '', }, } if not os.path.exists(DashboardOIDC.ConfigurationFilePath): with open(DashboardOIDC.ConfigurationFilePath, "w+") as f: encoder = json.JSONEncoder(indent=4) f.write(encoder.encode(self.__default)) self.ReadFile() def GetProviders(self): providers = {} for k in self.providers.keys(): if all([self.providers[k]['client_id'], self.providers[k]['client_secret'], self.providers[k]['issuer']]): providers[k] = { 'client_id': self.providers[k]['client_id'], 'issuer': self.providers[k]['issuer'].strip('/') } return providers def VerifyToken(self, provider, code, redirect_uri): if not all([provider, code, redirect_uri]): return False, "" if provider not in self.providers.keys(): return False, "Provider does not exist" provider = self.providers.get(provider) oidc_config = requests.get( f"{provider.get('issuer').strip('/')}/.well-known/openid-configuration", verify=certifi.where() ).json() data = { "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_id": provider.get('client_id'), "client_secret": provider.get('client_secret') } try: tokens = requests.post(oidc_config.get('token_endpoint'), data=data).json() if not all([tokens.get('access_token'), tokens.get('id_token')]): return False, tokens.get('error_description', None) except Exception as e: return False, str(e) id_token = tokens.get('id_token') jwks_uri = oidc_config.get("jwks_uri") issuer = oidc_config.get("issuer") jwks = requests.get(jwks_uri, verify=certifi.where()).json() print(jwks) headers = jwt.get_unverified_header(id_token) kid = headers["kid"] key = next(k for k in jwks["keys"] if k["kid"] == kid) payload = jwt.decode( id_token, key, algorithms=[key["alg"]], audience=provider.get('client_id'), issuer=issuer ) return True, payload def ReadFile(self): decoder = json.JSONDecoder() self.providers = decoder.decode( open(DashboardOIDC.ConfigurationFilePath, 'r').read() )