implement challenge verification flow with altcha integration and add challenge page

This commit is contained in:
Eduardo Silva
2026-03-18 08:56:48 -03:00
parent 0bd4136b5f
commit 5c5375cb9a
10 changed files with 325 additions and 2 deletions

View File

@@ -0,0 +1,67 @@
from urllib.parse import urlsplit
from auth_gateway.services.challenge_service import (
CHALLENGE_COOKIE_NAME,
generate_altcha_challenge,
generate_challenge_cookie,
verify_altcha_solution,
)
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
router = APIRouter()
def _safe_next(url: str | None, external_path: str) -> str:
if not url:
return f"{external_path}/login"
parts = urlsplit(url)
if parts.scheme or parts.netloc:
return f"{external_path}/login"
path = parts.path or "/"
if not path.startswith("/"):
return f"{external_path}/login"
return f"{path}?{parts.query}" if parts.query else path
@router.get("/challenge", response_class=HTMLResponse)
async def challenge_page(request: Request, next: str = "/"):
external_path = request.app.state.settings.external_path
return request.app.state.templates.TemplateResponse(
"challenge.html",
{"request": request, "external_path": external_path, "next": next},
)
@router.get("/challenge/data")
async def challenge_data(request: Request):
hmac_key = request.app.state.settings.challenge_secret
challenge = generate_altcha_challenge(hmac_key)
return JSONResponse(challenge)
@router.post("/challenge/verify")
async def challenge_verify(request: Request, next: str = Form("/"), altcha: str = Form(...)):
hmac_key = request.app.state.settings.challenge_secret
external_path = request.app.state.settings.external_path
if not verify_altcha_solution(altcha, hmac_key):
return request.app.state.templates.TemplateResponse(
"challenge.html",
{"request": request, "external_path": external_path, "next": next, "error": True},
status_code=400,
)
cookie_value = generate_challenge_cookie(hmac_key)
safe_next = _safe_next(next, external_path)
response = RedirectResponse(safe_next, status_code=303)
response.set_cookie(
key=CHALLENGE_COOKIE_NAME,
value=cookie_value,
httponly=True,
secure=request.app.state.settings.secure_cookies,
samesite="strict",
path="/",
max_age=300,
)
return response

View File

@@ -110,3 +110,19 @@ def validate_csrf(request: Request, submitted_token: str | None) -> None:
cookie_token = request.cookies.get(cookie_name)
if not cookie_token or not submitted_token or cookie_token != submitted_token:
raise HTTPException(status_code=403, detail="CSRF validation failed.")
def challenge_is_valid(request: Request) -> bool:
from auth_gateway.services.challenge_service import CHALLENGE_COOKIE_NAME, verify_challenge_cookie
cookie = request.cookies.get(CHALLENGE_COOKIE_NAME)
if not cookie:
return False
return verify_challenge_cookie(cookie, request.app.state.settings.challenge_secret)
def build_challenge_url(request: Request, login_route: str, next_url: str) -> str:
external_path = request.app.state.settings.external_path.rstrip("/")
login_path = f"{external_path}{login_route}"
if next_url and next_url != "/":
login_path = f"{login_path}?{urlencode({'next': next_url})}"
return build_external_url(request, "/challenge", next=login_path)

View File

@@ -9,7 +9,9 @@ from auth_gateway.services.policy_engine import evaluate_ip_access, extract_clie
from auth_gateway.services.resolver import normalize_host
from auth_gateway.services.totp_service import verify_totp
from auth_gateway.web.dependencies import (
build_challenge_url,
build_external_url,
challenge_is_valid,
get_effective_expiration,
get_effective_policy,
get_oidc_method,
@@ -147,6 +149,8 @@ async def login_page(request: Request, next: str = "/"):
@router.get("/login/password", response_class=HTMLResponse)
async def login_password_page(request: Request, next: str = "/"):
if not challenge_is_valid(request):
return RedirectResponse(build_challenge_url(request, "/login/password", next), status_code=303)
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
effective_policy = get_effective_policy(runtime_config, context.policy_name)
@@ -171,6 +175,8 @@ async def login_password_submit(
password: str = Form(...),
csrf_token: str = Form(...),
):
if not challenge_is_valid(request):
return RedirectResponse(build_challenge_url(request, "/login/password", next), status_code=303)
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
effective_policy = get_effective_policy(runtime_config, context.policy_name)
@@ -226,6 +232,8 @@ async def login_password_submit(
@router.get("/login/totp", response_class=HTMLResponse)
async def login_totp_page(request: Request, next: str = "/"):
if not challenge_is_valid(request):
return RedirectResponse(build_challenge_url(request, "/login/totp", next), status_code=303)
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
effective_policy = get_effective_policy(runtime_config, context.policy_name)
@@ -248,6 +256,8 @@ async def login_totp_page(request: Request, next: str = "/"):
@router.post("/login/totp")
@limiter.limit(AUTH_RATE_LIMIT)
async def login_totp_submit(request: Request, next: str = Form("/"), token: str = Form(...), csrf_token: str = Form(...)):
if not challenge_is_valid(request):
return RedirectResponse(build_challenge_url(request, "/login/totp", next), status_code=303)
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
effective_policy = get_effective_policy(runtime_config, context.policy_name)