OIDC should be good to go

This commit is contained in:
Donald Zou
2025-07-03 19:20:01 +08:00
parent 5ac84e109d
commit 77b156c7f5
5 changed files with 109 additions and 53 deletions

View File

@@ -10,6 +10,7 @@ class DashboardOIDC:
ConfigurationFilePath = os.path.join(ConfigurationPath, 'wg-dashboard-oidc-providers.json')
def __init__(self):
self.providers: dict[str, dict] = {}
self.provider_secret: dict[str, str] = {}
self.__default = {
'Provider': {
'client_id': '',
@@ -26,52 +27,36 @@ class DashboardOIDC:
self.ReadFile()
def GetProviders(self):
providers = {}
for k in self.providers.keys():
if all([self.providers[k]['client_id'], self.providers[k]['client_secret'], self.providers[k]['issuer']]):
try:
print("Requesting " + f"{self.providers[k]['issuer'].strip('/')}/.well-known/openid-configuration")
oidc_config = requests.get(
f"{self.providers[k]['issuer'].strip('/')}/.well-known/openid-configuration",
verify=certifi.where()
).json()
providers[k] = {
'client_id': self.providers[k]['client_id'],
'issuer': self.providers[k]['issuer'].strip('/')
}
except Exception as e:
current_app.logger.error("Failed to request OIDC config for this provider: " + self.providers[k]['issuer'].strip('/'), exc_info=e)
return providers
return self.providers
def VerifyToken(self, provider, code, redirect_uri):
try:
if not all([provider, code, redirect_uri]):
return False, ""
return False, "Please provide all parameters"
if provider not in self.providers.keys():
return False, "Provider does not exist"
provider = self.providers.get(provider)
oidc_config = requests.get(
f"{provider.get('issuer').strip('/')}/.well-known/openid-configuration",
verify=certifi.where()
).json()
secrete = self.provider_secret.get(provider)
oidc_config_status, oidc_config = self.GetProviderConfiguration(provider)
provider_info = self.providers.get(provider)
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": provider.get('client_id'),
"client_secret": provider.get('client_secret')
"client_id": provider_info.get('client_id'),
"client_secret": secrete
}
try:
tokens = requests.post(oidc_config.get('token_endpoint'), data=data).json()
if not all([tokens.get('access_token'), tokens.get('id_token')]):
print(oidc_config.get('token_endpoint'), data)
return False, tokens.get('error_description', None)
except Exception as e:
print(str(e))
return False, str(e)
access_token = tokens.get('access_token')
@@ -84,31 +69,58 @@ class DashboardOIDC:
kid = headers["kid"]
key = next(k for k in jwks["keys"] if k["kid"] == kid)
print(key)
payload = jwt.decode(
id_token,
key,
algorithms=[key["alg"]],
audience=provider.get('client_id'),
audience=provider_info.get('client_id'),
issuer=issuer,
access_token=access_token
)
print(payload)
return True, payload
except Exception as e:
current_app.logger.error('Read OIDC file failed. Reason: ' + str(e), provider, code, redirect_uri)
return False, str(e)
def GetProviderConfiguration(self, provider_name):
if not all([provider_name]):
return False, None
provider = self.providers.get(provider_name)
try:
oidc_config = requests.get(
f"{provider.get('issuer').strip('/')}/.well-known/openid-configuration",
verify=certifi.where()
).json()
except Exception as e:
current_app.logger.error("Failed to get OpenID Configuration of " + provider.get('issuer'), exc_info=e)
return False, None
return True, oidc_config
def ReadFile(self):
decoder = json.JSONDecoder()
try:
self.providers = decoder.decode(
providers = decoder.decode(
open(DashboardOIDC.ConfigurationFilePath, 'r').read()
)
print(self.providers)
for k in providers.keys():
if all([providers[k]['client_id'], providers[k]['client_secret'], providers[k]['issuer']]):
try:
print("Requesting " + f"{providers[k]['issuer'].strip('/')}/.well-known/openid-configuration")
oidc_config = requests.get(
f"{providers[k]['issuer'].strip('/')}/.well-known/openid-configuration",
timeout=3,
verify=certifi.where()
).json()
self.providers[k] = {
'client_id': providers[k]['client_id'],
'issuer': providers[k]['issuer'].strip('/'),
'openid_configuration': oidc_config
}
self.provider_secret[k] = providers[k]['client_secret']
except Exception as e:
current_app.logger.error("Failed to request OIDC config for this provider: " + providers[k]['issuer'].strip('/'), exc_info=e)
except Exception as e:
current_app.logger.error('Read OIDC file failed. Reason: ' + str(e))
return False