From 6b70701c9c35d58d55273ebb780730f9ae31f9fe Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 15 Mar 2026 11:37:25 -0300 Subject: [PATCH] refactor Caddyfile generation to improve policy handling and cookie collection --- containers/caddy/process_config.py | 105 +++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 22 deletions(-) diff --git a/containers/caddy/process_config.py b/containers/caddy/process_config.py index 0f77206..0a7013f 100644 --- a/containers/caddy/process_config.py +++ b/containers/caddy/process_config.py @@ -73,6 +73,12 @@ def collect_all_applications(): def build_caddyfile(apps, auth_policies, routes): lines = [] has_authelia = auth_policies is not None + policies = auth_policies.get("policies", {}) if has_authelia else {} + + def get_policy_type(policy_name): + if policy_name and policy_name in policies: + return policies[policy_name].get("policy_type", "bypass") + return "bypass" for app in apps: app_id = app.get("id", "unknown") @@ -116,27 +122,61 @@ def build_caddyfile(apps, auth_policies, routes): for route in app_route_data.get("routes", []): app_routes[route.get("path_prefix", "")] = route.get("policy", "") + default_policy_type = get_policy_type(app_default_policy) + + # When the default policy is deny, use handle blocks for specific + # non-deny routes and a catch-all respond 403 at Caddy level, + # avoiding an unnecessary Authelia round-trip. + if has_authelia and default_policy_type == "deny": + for path_prefix, policy_name in app_routes.items(): + ptype = get_policy_type(policy_name) + if ptype == "bypass": + lines.append(f" handle {path_prefix}/* {{") + lines.append(f" reverse_proxy {upstream}") + lines.append(f" }}") + lines.append("") + elif ptype == "deny": + lines.append(f" handle {path_prefix}/* {{") + lines.append(f" respond 403") + lines.append(f" }}") + lines.append("") + else: + lines.append(f" handle {path_prefix}/* {{") + lines.append(f" forward_auth {AUTHELIA_INTERNAL_URL} {{") + lines.append(f" uri {AUTHELIA_PORTAL_PATH}/api/authz/forward-auth") + lines.append(f" copy_headers Remote-User Remote-Groups Remote-Name Remote-Email") + lines.append(f" }}") + lines.append(f" reverse_proxy {upstream}") + lines.append(f" }}") + lines.append("") + lines.append(f" respond 403") + lines.append(f"}}") + lines.append("") + continue + + # For bypass/protected default policy: emit explicit deny blocks for + # any per-route deny entries before the forward_auth check. + for path_prefix, policy_name in app_routes.items(): + if get_policy_type(policy_name) == "deny": + lines.append(f" handle {path_prefix}/* {{") + lines.append(f" respond 403") + lines.append(f" }}") + lines.append("") + needs_auth = False if has_authelia and auth_policies: - policies = auth_policies.get("policies", {}) - if app_default_policy and app_default_policy in policies: - policy_data = policies[app_default_policy] - if policy_data.get("policy_type") != "bypass": - needs_auth = True + if default_policy_type not in ("bypass", "deny"): + needs_auth = True for path_prefix, policy_name in app_routes.items(): - if policy_name in policies: - policy_data = policies[policy_name] - if policy_data.get("policy_type") != "bypass": - needs_auth = True + if get_policy_type(policy_name) not in ("bypass", "deny"): + needs_auth = True if needs_auth: for path_prefix, policy_name in app_routes.items(): - if policy_name in auth_policies.get("policies", {}): - policy_data = auth_policies["policies"][policy_name] - if policy_data.get("policy_type") == "bypass": - lines.append(f" @bypass_{_sanitize_id(path_prefix)} path {path_prefix}*") - lines.append(f" skip_log @bypass_{_sanitize_id(path_prefix)}") - lines.append("") + if get_policy_type(policy_name) == "bypass": + lines.append(f" @bypass_{_sanitize_id(path_prefix)} path {path_prefix}*") + lines.append(f" skip_log @bypass_{_sanitize_id(path_prefix)}") + lines.append("") lines.append(f" forward_auth {AUTHELIA_INTERNAL_URL} {{") lines.append(f" uri {AUTHELIA_PORTAL_PATH}/api/authz/forward-auth") @@ -155,6 +195,33 @@ def _sanitize_id(value): return value.strip("/").replace("/", "_").replace("-", "_") +def _collect_app_domains(apps, server_address): + """Return session cookie entries for all unique app hostnames. + + Authelia v4.37+ requires a session.cookies entry for every domain managed + via forward_auth. Without it, Authelia does not recognise the domain and + may allow requests through regardless of the access_control policy. + """ + seen = {server_address} + cookies = [ + { + "domain": server_address, + "authelia_url": f"https://{server_address}{AUTHELIA_PORTAL_PATH}", + "default_redirection_url": f"https://{server_address}", + } + ] + for app in apps: + for host in app.get("hosts", []): + if host not in seen: + seen.add(host) + cookies.append({ + "domain": host, + "authelia_url": f"https://{server_address}{AUTHELIA_PORTAL_PATH}", + "default_redirection_url": f"https://{server_address}", + }) + return cookies + + def build_authelia_config(auth_policies, routes, apps): server_address = os.environ.get("SERVER_ADDRESS", "localhost") @@ -181,13 +248,7 @@ def build_authelia_config(auth_policies, routes, apps): }, "session": { "secret": session_secret, - "cookies": [ - { - "domain": server_address, - "authelia_url": f"https://{server_address}{AUTHELIA_PORTAL_PATH}", - "default_redirection_url": f"https://{server_address}", - }, - ], + "cookies": _collect_app_domains(apps, server_address), }, "storage": { "encryption_key": storage_encryption_key,