mirror of
https://github.com/eduardogsilva/wireguard_webadmin.git
synced 2026-03-15 13:36:18 +00:00
398 lines
13 KiB
Python
398 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Reads JSON config files exported by Django and generates:
|
||
|
|
- /etc/caddy/Caddyfile
|
||
|
|
- /authelia_config/configuration.yml
|
||
|
|
- /authelia_config/users_database.yml
|
||
|
|
|
||
|
|
Expected input files in /caddy_json_export/:
|
||
|
|
- wireguard_webadmin.json (required, generated on container startup)
|
||
|
|
- applications.json (optional, exported from Django)
|
||
|
|
- auth_policies.json (optional, exported from Django)
|
||
|
|
- routes.json (optional, exported from Django)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import secrets
|
||
|
|
import string
|
||
|
|
|
||
|
|
import yaml
|
||
|
|
|
||
|
|
JSON_DIR = os.environ.get("JSON_DIR", "/caddy_json_export")
|
||
|
|
CADDYFILE_PATH = os.environ.get("CADDYFILE_PATH", "/etc/caddy/Caddyfile")
|
||
|
|
AUTHELIA_CONFIG_DIR = os.environ.get("AUTHELIA_CONFIG_DIR", "/authelia_config")
|
||
|
|
AUTHELIA_CONFIG_PATH = os.path.join(AUTHELIA_CONFIG_DIR, "configuration.yml")
|
||
|
|
AUTHELIA_USERS_PATH = os.path.join(AUTHELIA_CONFIG_DIR, "users_database.yml")
|
||
|
|
AUTHELIA_SECRETS_DIR = os.path.join(AUTHELIA_CONFIG_DIR, "secrets")
|
||
|
|
|
||
|
|
AUTHELIA_INTERNAL_URL = "http://authelia:9091"
|
||
|
|
AUTHELIA_PORTAL_PATH = "/authelia"
|
||
|
|
|
||
|
|
|
||
|
|
def load_json(filename):
|
||
|
|
filepath = os.path.join(JSON_DIR, filename)
|
||
|
|
if not os.path.exists(filepath):
|
||
|
|
return None
|
||
|
|
with open(filepath, "r", encoding="utf-8") as json_file:
|
||
|
|
return json.load(json_file)
|
||
|
|
|
||
|
|
|
||
|
|
def generate_secret(length=64):
|
||
|
|
alphabet = string.ascii_letters + string.digits
|
||
|
|
return "".join(secrets.choice(alphabet) for char_index in range(length))
|
||
|
|
|
||
|
|
|
||
|
|
def get_or_create_secret(name):
|
||
|
|
os.makedirs(AUTHELIA_SECRETS_DIR, exist_ok=True)
|
||
|
|
secret_path = os.path.join(AUTHELIA_SECRETS_DIR, name)
|
||
|
|
if os.path.exists(secret_path):
|
||
|
|
with open(secret_path, "r", encoding="utf-8") as secret_file:
|
||
|
|
return secret_file.read().strip()
|
||
|
|
secret_value = generate_secret()
|
||
|
|
with open(secret_path, "w", encoding="utf-8") as secret_file:
|
||
|
|
secret_file.write(secret_value)
|
||
|
|
return secret_value
|
||
|
|
|
||
|
|
|
||
|
|
def collect_all_applications():
|
||
|
|
"""Merge entries from wireguard_webadmin.json and applications.json."""
|
||
|
|
apps = []
|
||
|
|
|
||
|
|
webadmin_data = load_json("wireguard_webadmin.json")
|
||
|
|
if webadmin_data:
|
||
|
|
apps.extend(webadmin_data.get("entries", []))
|
||
|
|
|
||
|
|
applications_data = load_json("applications.json")
|
||
|
|
if applications_data:
|
||
|
|
apps.extend(applications_data.get("entries", []))
|
||
|
|
|
||
|
|
return apps
|
||
|
|
|
||
|
|
|
||
|
|
def build_caddyfile(apps, auth_policies, routes):
|
||
|
|
lines = []
|
||
|
|
has_authelia = auth_policies is not None
|
||
|
|
|
||
|
|
for app in apps:
|
||
|
|
app_id = app.get("id", "unknown")
|
||
|
|
hosts = app.get("hosts", [])
|
||
|
|
upstream = app.get("upstream", "")
|
||
|
|
static_routes = app.get("static_routes", [])
|
||
|
|
|
||
|
|
if not hosts or not upstream:
|
||
|
|
continue
|
||
|
|
|
||
|
|
host_list = ", ".join(hosts)
|
||
|
|
lines.append(f"{host_list} {{")
|
||
|
|
|
||
|
|
for static_route in static_routes:
|
||
|
|
path_prefix = static_route.get("path_prefix", "")
|
||
|
|
root_dir = static_route.get("root", "")
|
||
|
|
strip_prefix = static_route.get("strip_prefix", "")
|
||
|
|
cache_control = static_route.get("cache_control", "")
|
||
|
|
|
||
|
|
lines.append(f" handle_path {path_prefix}/* {{")
|
||
|
|
lines.append(f" root * {root_dir}")
|
||
|
|
lines.append(f" file_server")
|
||
|
|
if cache_control:
|
||
|
|
lines.append(f" header Cache-Control \"{cache_control}\"")
|
||
|
|
lines.append(f" }}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
app_routes = {}
|
||
|
|
app_default_policy = None
|
||
|
|
if routes:
|
||
|
|
route_entries = routes.get("entries", {})
|
||
|
|
if app_id in route_entries:
|
||
|
|
app_route_data = route_entries[app_id]
|
||
|
|
app_default_policy = app_route_data.get("default_policy")
|
||
|
|
for route in app_route_data.get("routes", []):
|
||
|
|
app_routes[route.get("path_prefix", "")] = route.get("policy", "")
|
||
|
|
|
||
|
|
needs_auth = False
|
||
|
|
if has_authelia and auth_policies:
|
||
|
|
policies = auth_policies.get("policies", {})
|
||
|
|
if app_default_policy and app_default_policy in policies:
|
||
|
|
policy_data = policies[app_default_policy]
|
||
|
|
if policy_data.get("policy_type") != "bypass":
|
||
|
|
needs_auth = True
|
||
|
|
for path_prefix, policy_name in app_routes.items():
|
||
|
|
if policy_name in policies:
|
||
|
|
policy_data = policies[policy_name]
|
||
|
|
if policy_data.get("policy_type") != "bypass":
|
||
|
|
needs_auth = True
|
||
|
|
|
||
|
|
if needs_auth:
|
||
|
|
for path_prefix, policy_name in app_routes.items():
|
||
|
|
if policy_name in auth_policies.get("policies", {}):
|
||
|
|
policy_data = auth_policies["policies"][policy_name]
|
||
|
|
if policy_data.get("policy_type") == "bypass":
|
||
|
|
lines.append(f" @bypass_{_sanitize_id(path_prefix)} path {path_prefix}*")
|
||
|
|
lines.append(f" skip_log @bypass_{_sanitize_id(path_prefix)}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
lines.append(f" forward_auth {AUTHELIA_INTERNAL_URL} {{")
|
||
|
|
lines.append(f" uri {AUTHELIA_PORTAL_PATH}/api/authz/forward-auth")
|
||
|
|
lines.append(f" copy_headers Remote-User Remote-Groups Remote-Name Remote-Email")
|
||
|
|
lines.append(f" }}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
lines.append(f" reverse_proxy {upstream}")
|
||
|
|
lines.append(f"}}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
if has_authelia:
|
||
|
|
server_address = os.environ.get("SERVER_ADDRESS", "localhost")
|
||
|
|
lines.append(f"{server_address} {{")
|
||
|
|
lines.append(f" handle_path {AUTHELIA_PORTAL_PATH}/* {{")
|
||
|
|
lines.append(f" reverse_proxy {AUTHELIA_INTERNAL_URL}")
|
||
|
|
lines.append(f" }}")
|
||
|
|
lines.append(f"}}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def _sanitize_id(value):
|
||
|
|
return value.strip("/").replace("/", "_").replace("-", "_")
|
||
|
|
|
||
|
|
|
||
|
|
def build_authelia_config(auth_policies, routes, apps):
|
||
|
|
server_address = os.environ.get("SERVER_ADDRESS", "localhost")
|
||
|
|
|
||
|
|
jwt_secret = get_or_create_secret("jwt_secret")
|
||
|
|
session_secret = get_or_create_secret("session_secret")
|
||
|
|
storage_encryption_key = get_or_create_secret("storage_encryption_key")
|
||
|
|
|
||
|
|
config = {
|
||
|
|
"server": {
|
||
|
|
"address": "tcp://0.0.0.0:9091",
|
||
|
|
},
|
||
|
|
"log": {
|
||
|
|
"level": "info",
|
||
|
|
},
|
||
|
|
"jwt_secret": jwt_secret,
|
||
|
|
"authentication_backend": {
|
||
|
|
"file": {
|
||
|
|
"path": "/config/users_database.yml",
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"session": {
|
||
|
|
"secret": session_secret,
|
||
|
|
"cookies": [
|
||
|
|
{
|
||
|
|
"domain": server_address,
|
||
|
|
"authelia_url": f"https://{server_address}{AUTHELIA_PORTAL_PATH}",
|
||
|
|
"default_redirection_url": f"https://{server_address}",
|
||
|
|
},
|
||
|
|
],
|
||
|
|
},
|
||
|
|
"storage": {
|
||
|
|
"encryption_key": storage_encryption_key,
|
||
|
|
"local": {
|
||
|
|
"path": "/data/db.sqlite3",
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"notifier": {
|
||
|
|
"filesystem": {
|
||
|
|
"filename": "/data/notification.txt",
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"access_control": build_access_control_rules(auth_policies, routes, apps),
|
||
|
|
}
|
||
|
|
|
||
|
|
identity_providers = build_identity_providers(auth_policies, server_address)
|
||
|
|
if identity_providers:
|
||
|
|
config["identity_providers"] = identity_providers
|
||
|
|
|
||
|
|
return config
|
||
|
|
|
||
|
|
|
||
|
|
def build_access_control_rules(auth_policies, routes, apps):
|
||
|
|
if not auth_policies or not routes:
|
||
|
|
return {"default_policy": "bypass", "rules": []}
|
||
|
|
|
||
|
|
policies = auth_policies.get("policies", {})
|
||
|
|
route_entries = routes.get("entries", {})
|
||
|
|
|
||
|
|
host_map = {}
|
||
|
|
for app in apps:
|
||
|
|
app_id = app.get("id", "unknown")
|
||
|
|
host_map[app_id] = app.get("hosts", [])
|
||
|
|
|
||
|
|
rules = []
|
||
|
|
|
||
|
|
for app_id, route_data in route_entries.items():
|
||
|
|
app_hosts = host_map.get(app_id, [])
|
||
|
|
if not app_hosts:
|
||
|
|
continue
|
||
|
|
|
||
|
|
domain_list = list(app_hosts)
|
||
|
|
|
||
|
|
for route in route_data.get("routes", []):
|
||
|
|
policy_name = route.get("policy", "")
|
||
|
|
path_prefix = route.get("path_prefix", "")
|
||
|
|
|
||
|
|
if policy_name not in policies:
|
||
|
|
continue
|
||
|
|
|
||
|
|
policy_data = policies[policy_name]
|
||
|
|
policy_type = policy_data.get("policy_type", "bypass")
|
||
|
|
|
||
|
|
if policy_type == "bypass":
|
||
|
|
authelia_policy = "bypass"
|
||
|
|
elif policy_type == "deny":
|
||
|
|
authelia_policy = "deny"
|
||
|
|
else:
|
||
|
|
authelia_policy = "two_factor"
|
||
|
|
|
||
|
|
rule = {
|
||
|
|
"domain": domain_list,
|
||
|
|
"policy": authelia_policy,
|
||
|
|
"resources": [f"^{path_prefix}.*$"],
|
||
|
|
}
|
||
|
|
|
||
|
|
groups = policy_data.get("groups", [])
|
||
|
|
if groups and authelia_policy not in ("bypass", "deny"):
|
||
|
|
rule["subject"] = [f"group:{g}" for g in groups]
|
||
|
|
|
||
|
|
rules.append(rule)
|
||
|
|
|
||
|
|
default_policy_name = route_data.get("default_policy")
|
||
|
|
if default_policy_name and default_policy_name in policies:
|
||
|
|
default_policy_data = policies[default_policy_name]
|
||
|
|
default_type = default_policy_data.get("policy_type", "bypass")
|
||
|
|
|
||
|
|
if default_type == "bypass":
|
||
|
|
authelia_default = "bypass"
|
||
|
|
elif default_type == "deny":
|
||
|
|
authelia_default = "deny"
|
||
|
|
else:
|
||
|
|
authelia_default = "two_factor"
|
||
|
|
|
||
|
|
default_rule = {
|
||
|
|
"domain": domain_list,
|
||
|
|
"policy": authelia_default,
|
||
|
|
}
|
||
|
|
|
||
|
|
groups = default_policy_data.get("groups", [])
|
||
|
|
if groups and authelia_default not in ("bypass", "deny"):
|
||
|
|
default_rule["subject"] = [f"group:{g}" for g in groups]
|
||
|
|
|
||
|
|
rules.append(default_rule)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"default_policy": "deny",
|
||
|
|
"rules": rules,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_identity_providers(auth_policies, server_address):
|
||
|
|
if not auth_policies:
|
||
|
|
return None
|
||
|
|
|
||
|
|
auth_methods = auth_policies.get("auth_methods", {})
|
||
|
|
oidc_clients = []
|
||
|
|
|
||
|
|
for method_name, method in auth_methods.items():
|
||
|
|
if method.get("type") != "oidc":
|
||
|
|
continue
|
||
|
|
|
||
|
|
client_id = method.get("client_id", "")
|
||
|
|
client_secret = method.get("client_secret", "")
|
||
|
|
if not client_id:
|
||
|
|
continue
|
||
|
|
|
||
|
|
client = {
|
||
|
|
"client_id": client_id,
|
||
|
|
"client_secret": client_secret,
|
||
|
|
"authorization_policy": "two_factor",
|
||
|
|
"redirect_uris": [
|
||
|
|
f"https://{server_address}{AUTHELIA_PORTAL_PATH}/oidc/callback",
|
||
|
|
],
|
||
|
|
"scopes": ["openid", "profile", "email", "groups"],
|
||
|
|
}
|
||
|
|
oidc_clients.append(client)
|
||
|
|
|
||
|
|
if not oidc_clients:
|
||
|
|
return None
|
||
|
|
|
||
|
|
hmac_secret = get_or_create_secret("oidc_hmac_secret")
|
||
|
|
|
||
|
|
return {
|
||
|
|
"oidc": {
|
||
|
|
"hmac_secret": hmac_secret,
|
||
|
|
"clients": oidc_clients,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_users_database(auth_policies):
|
||
|
|
if not auth_policies:
|
||
|
|
return {"users": {}}
|
||
|
|
|
||
|
|
users_data = auth_policies.get("users", {})
|
||
|
|
groups_data = auth_policies.get("groups", {})
|
||
|
|
|
||
|
|
user_groups = {}
|
||
|
|
for group_name, group_info in groups_data.items():
|
||
|
|
for username in group_info.get("users", []):
|
||
|
|
if username not in user_groups:
|
||
|
|
user_groups[username] = []
|
||
|
|
user_groups[username].append(group_name)
|
||
|
|
|
||
|
|
users = {}
|
||
|
|
for username, user_info in users_data.items():
|
||
|
|
user_entry = {
|
||
|
|
"displayname": username,
|
||
|
|
"email": user_info.get("email", f"{username}@localhost"),
|
||
|
|
"groups": user_groups.get(username, []),
|
||
|
|
}
|
||
|
|
password_hash = user_info.get("password_hash", "")
|
||
|
|
if password_hash:
|
||
|
|
user_entry["password"] = password_hash
|
||
|
|
|
||
|
|
users[username] = user_entry
|
||
|
|
|
||
|
|
return {"users": users}
|
||
|
|
|
||
|
|
|
||
|
|
class _NoAliasDumper(yaml.SafeDumper):
|
||
|
|
"""YAML dumper that never emits anchors/aliases."""
|
||
|
|
def ignore_aliases(self, data):
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def write_yaml(filepath, data):
|
||
|
|
os.makedirs(os.path.dirname(filepath), exist_ok=True)
|
||
|
|
with open(filepath, "w", encoding="utf-8") as yaml_file:
|
||
|
|
yaml.dump(data, yaml_file, Dumper=_NoAliasDumper, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
apps = collect_all_applications()
|
||
|
|
auth_policies = load_json("auth_policies.json")
|
||
|
|
routes = load_json("routes.json")
|
||
|
|
|
||
|
|
caddyfile_content = build_caddyfile(apps, auth_policies, routes)
|
||
|
|
os.makedirs(os.path.dirname(CADDYFILE_PATH), exist_ok=True)
|
||
|
|
with open(CADDYFILE_PATH, "w", encoding="utf-8") as caddyfile:
|
||
|
|
caddyfile.write(caddyfile_content)
|
||
|
|
print(f"Caddyfile written to {CADDYFILE_PATH}")
|
||
|
|
|
||
|
|
if auth_policies:
|
||
|
|
authelia_config = build_authelia_config(auth_policies, routes, apps)
|
||
|
|
write_yaml(AUTHELIA_CONFIG_PATH, authelia_config)
|
||
|
|
print(f"Authelia configuration written to {AUTHELIA_CONFIG_PATH}")
|
||
|
|
|
||
|
|
users_db = build_users_database(auth_policies)
|
||
|
|
write_yaml(AUTHELIA_USERS_PATH, users_db)
|
||
|
|
print(f"Authelia users database written to {AUTHELIA_USERS_PATH}")
|
||
|
|
else:
|
||
|
|
print("No auth_policies.json found, skipping Authelia config generation.")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|