From a9bfcac7710ace51b66e9a09f9c9ddb6136b0081 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 16 Mar 2026 19:53:05 -0300 Subject: [PATCH] enhance security by sanitizing headers and preventing open redirects --- .../auth_gateway/web/auth_routes.py | 15 +++++++---- .../auth_gateway/web/login_routes.py | 25 ++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/containers/auth-gateway/auth_gateway/web/auth_routes.py b/containers/auth-gateway/auth_gateway/web/auth_routes.py index 6b2ffc3..5e45837 100644 --- a/containers/auth-gateway/auth_gateway/web/auth_routes.py +++ b/containers/auth-gateway/auth_gateway/web/auth_routes.py @@ -1,3 +1,5 @@ +import re + from auth_gateway.services.policy_engine import evaluate_ip_access, extract_client_ip from auth_gateway.services.resolver import resolve_request_context from auth_gateway.web.dependencies import ( @@ -51,15 +53,18 @@ async def auth_check(request: Request): login_url = build_external_url(request, "/login", next=context.path) return RedirectResponse(login_url, status_code=302) + def _safe_header(value: str) -> str: + return re.sub(r"[\r\n\x00]", "", value) + response = PlainTextResponse("OK", status_code=200) if session: if session.username: - response.headers["X-Auth-User"] = session.username + response.headers["X-Auth-User"] = _safe_header(session.username) if session.email: - response.headers["X-Auth-Email"] = session.email + response.headers["X-Auth-Email"] = _safe_header(session.email) if session.groups: - response.headers["X-Auth-Groups"] = ",".join(session.groups) + response.headers["X-Auth-Groups"] = _safe_header(",".join(session.groups)) if session.auth_factors: - response.headers["X-Auth-Factors"] = ",".join(session.auth_factors) - response.headers["X-Auth-Policy"] = effective_policy.name + response.headers["X-Auth-Factors"] = _safe_header(",".join(session.auth_factors)) + response.headers["X-Auth-Policy"] = _safe_header(effective_policy.name) return response diff --git a/containers/auth-gateway/auth_gateway/web/login_routes.py b/containers/auth-gateway/auth_gateway/web/login_routes.py index 96066a2..0db5d3f 100644 --- a/containers/auth-gateway/auth_gateway/web/login_routes.py +++ b/containers/auth-gateway/auth_gateway/web/login_routes.py @@ -69,7 +69,7 @@ async def login_page(request: Request, next: str = "/"): if effective_policy.mode == "deny": return _render(request, "error.html", status_code=403, title="Access denied", message="This route is blocked by policy.") if effective_policy.mode == "bypass": - return RedirectResponse(next, status_code=303) + return RedirectResponse(context.path, status_code=303) if session and session_is_allowed(session, effective_policy): current_factors = set(session.auth_factors) @@ -79,11 +79,11 @@ async def login_page(request: Request, next: str = "/"): current_factors.add("ip") missing_factors = [factor for factor in effective_policy.required_factors if factor not in current_factors] if not missing_factors: - return RedirectResponse(next, status_code=303) + return RedirectResponse(context.path, status_code=303) if missing_factors == ["totp"]: - return RedirectResponse(build_external_url(request, "/login/totp", next=next), status_code=303) + return RedirectResponse(build_external_url(request, "/login/totp", next=context.path), status_code=303) if missing_factors == ["oidc"]: - return RedirectResponse(build_external_url(request, "/login/oidc/start", next=next), status_code=303) + return RedirectResponse(build_external_url(request, "/login/oidc/start", next=context.path), status_code=303) available_methods = [] if effective_policy.password_method_names: @@ -94,16 +94,16 @@ async def login_page(request: Request, next: str = "/"): available_methods.append("totp") if available_methods == ["password"]: - return RedirectResponse(build_external_url(request, "/login/password", next=next), status_code=303) + return RedirectResponse(build_external_url(request, "/login/password", next=context.path), status_code=303) if available_methods == ["oidc"]: - return RedirectResponse(build_external_url(request, "/login/oidc/start", next=next), status_code=303) + return RedirectResponse(build_external_url(request, "/login/oidc/start", next=context.path), status_code=303) if available_methods == ["totp"]: - return RedirectResponse(build_external_url(request, "/login/totp", next=next), status_code=303) + return RedirectResponse(build_external_url(request, "/login/totp", next=context.path), status_code=303) return _render( request, "login.html", - next=next, + next=context.path, methods=available_methods, application_name=context.application.name, policy_name=context.policy_name, @@ -265,13 +265,20 @@ async def login_oidc_callback(request: Request, state: str): return _redirect_with_cookie(request, oidc_state.next_url, session) +def _safe_redirect_path(url: str | None) -> str: + """Accept only relative paths to prevent open redirects.""" + if not url or "://" in url or not url.startswith("/"): + return "/" + return url + + def _do_logout(request: Request, next_url: str = "/") -> RedirectResponse: session_cookie = request.cookies.get(request.app.state.settings.cookie_name) session = request.app.state.session_service.get_session(session_cookie) request.app.state.session_service.delete_session(session_cookie) if session: logger.info("AUTH logout for '%s'", session.username or session.email or "unknown") - response = RedirectResponse(next_url or "/", status_code=303) + response = RedirectResponse(_safe_redirect_path(next_url), status_code=303) response.delete_cookie(request.app.state.settings.cookie_name, path="/") return response