Files
ProxMenux/AppImage/scripts/oci_manager.py

1261 lines
43 KiB
Python
Raw Normal View History

2026-03-12 21:30:44 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ProxMenux OCI Manager
2026-03-12 23:37:21 +01:00
Manages deployment and lifecycle of OCI container applications using Proxmox VE 9.1+
native LXC from OCI images functionality.
2026-03-12 21:30:44 +01:00
Usage:
- As library: import oci_manager; oci_manager.deploy_app(...)
- As CLI: python oci_manager.py deploy --app-id secure-gateway --config '{...}'
"""
2026-03-12 23:37:21 +01:00
import base64
2026-03-12 21:30:44 +01:00
import json
import logging
import os
2026-03-12 23:37:21 +01:00
import re
import secrets
2026-03-12 21:30:44 +01:00
import shutil
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
2026-03-12 23:37:21 +01:00
# Optional: cryptography for encryption
try:
from cryptography.fernet import Fernet
ENCRYPTION_AVAILABLE = True
except ImportError:
ENCRYPTION_AVAILABLE = False
2026-03-12 21:30:44 +01:00
# Logging
logger = logging.getLogger("proxmenux.oci")
# =================================================================
# Paths
# =================================================================
OCI_BASE_DIR = "/usr/local/share/proxmenux/oci"
CATALOG_FILE = os.path.join(OCI_BASE_DIR, "catalog.json")
INSTALLED_FILE = os.path.join(OCI_BASE_DIR, "installed.json")
INSTANCES_DIR = os.path.join(OCI_BASE_DIR, "instances")
# Source catalog from Scripts (bundled with ProxMenux)
SCRIPTS_CATALOG = "/usr/local/share/proxmenux/scripts/oci/catalog.json"
DEV_SCRIPTS_CATALOG = os.path.join(os.path.dirname(__file__), "..", "..", "Scripts", "oci", "catalog.json")
2026-03-12 23:37:21 +01:00
# Encryption key file
ENCRYPTION_KEY_FILE = os.path.join(OCI_BASE_DIR, ".encryption_key")
# Default storage for templates
DEFAULT_STORAGE = "local"
# VMID range for OCI containers (9000-9999 to avoid conflicts)
OCI_VMID_START = 9000
OCI_VMID_END = 9999
# =================================================================
# Encryption Functions for Sensitive Data
# =================================================================
def _get_or_create_encryption_key() -> bytes:
"""Get or create the encryption key for sensitive data."""
if os.path.exists(ENCRYPTION_KEY_FILE):
with open(ENCRYPTION_KEY_FILE, 'rb') as f:
return f.read()
if ENCRYPTION_AVAILABLE:
key = Fernet.generate_key()
else:
key = secrets.token_bytes(32)
os.makedirs(os.path.dirname(ENCRYPTION_KEY_FILE), exist_ok=True)
with open(ENCRYPTION_KEY_FILE, 'wb') as f:
f.write(key)
os.chmod(ENCRYPTION_KEY_FILE, 0o600)
return key
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
def encrypt_sensitive_value(value: str) -> str:
"""Encrypt a sensitive value. Returns base64-encoded string with 'ENC:' prefix."""
if not value:
return value
key = _get_or_create_encryption_key()
if ENCRYPTION_AVAILABLE:
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return "ENC:" + encrypted.decode()
else:
value_bytes = value.encode()
encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes))
return "ENC:" + base64.b64encode(encrypted).decode()
def decrypt_sensitive_value(encrypted: str) -> str:
"""Decrypt a sensitive value."""
if not encrypted or not encrypted.startswith("ENC:"):
return encrypted
encrypted_data = encrypted[4:]
key = _get_or_create_encryption_key()
try:
if ENCRYPTION_AVAILABLE:
f = Fernet(key)
decrypted = f.decrypt(encrypted_data.encode())
return decrypted.decode()
else:
encrypted_bytes = base64.b64decode(encrypted_data)
decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes))
return decrypted.decode()
except Exception as e:
logger.error(f"Failed to decrypt value: {e}")
return encrypted
def encrypt_config_sensitive_fields(config: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""Encrypt sensitive fields in config based on schema."""
encrypted_config = config.copy()
for field_name, field_schema in schema.items():
if field_schema.get("sensitive") and field_name in encrypted_config:
value = encrypted_config[field_name]
if value and not str(value).startswith("ENC:"):
encrypted_config[field_name] = encrypt_sensitive_value(str(value))
return encrypted_config
def decrypt_config_sensitive_fields(config: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""Decrypt sensitive fields in config based on schema."""
decrypted_config = config.copy()
for field_name, field_schema in schema.items():
if field_schema.get("sensitive") and field_name in decrypted_config:
value = decrypted_config[field_name]
if value and str(value).startswith("ENC:"):
decrypted_config[field_name] = decrypt_sensitive_value(str(value))
return decrypted_config
# =================================================================
# Directory Management
# =================================================================
2026-03-12 21:30:44 +01:00
def ensure_oci_directories():
2026-03-12 23:37:21 +01:00
"""Ensure OCI directories exist and catalog is available."""
2026-03-12 21:30:44 +01:00
os.makedirs(OCI_BASE_DIR, exist_ok=True)
os.makedirs(INSTANCES_DIR, exist_ok=True)
if not os.path.exists(CATALOG_FILE):
if os.path.exists(SCRIPTS_CATALOG):
shutil.copy2(SCRIPTS_CATALOG, CATALOG_FILE)
elif os.path.exists(DEV_SCRIPTS_CATALOG):
shutil.copy2(DEV_SCRIPTS_CATALOG, CATALOG_FILE)
if not os.path.exists(INSTALLED_FILE):
with open(INSTALLED_FILE, 'w') as f:
json.dump({"version": "1.0.0", "instances": {}}, f, indent=2)
2026-03-12 22:13:56 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
# Proxmox VE Detection and Compatibility
2026-03-12 22:13:56 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
def check_proxmox_version() -> Dict[str, Any]:
"""Check Proxmox VE version and OCI support."""
2026-03-12 22:13:56 +01:00
result = {
2026-03-12 23:37:21 +01:00
"is_proxmox": False,
"version": None,
"oci_support": False,
"error": None
2026-03-12 22:13:56 +01:00
}
try:
2026-03-12 23:37:21 +01:00
# Check if pveversion exists
if not shutil.which("pveversion"):
result["error"] = "Not running on Proxmox VE"
return result
2026-03-12 22:13:56 +01:00
2026-03-12 23:37:21 +01:00
proc = subprocess.run(
["pveversion"],
capture_output=True, text=True, timeout=5
)
2026-03-12 22:13:56 +01:00
if proc.returncode != 0:
2026-03-12 23:37:21 +01:00
result["error"] = "Failed to get Proxmox version"
2026-03-12 22:13:56 +01:00
return result
2026-03-12 23:37:21 +01:00
# Parse version: "pve-manager/9.1-2/abc123..."
version_str = proc.stdout.strip()
result["is_proxmox"] = True
2026-03-12 22:13:56 +01:00
2026-03-12 23:37:21 +01:00
match = re.search(r'pve-manager/(\d+)\.(\d+)', version_str)
if match:
major = int(match.group(1))
minor = int(match.group(2))
result["version"] = f"{major}.{minor}"
# OCI support requires Proxmox VE 9.1+
result["oci_support"] = (major > 9) or (major == 9 and minor >= 1)
2026-03-12 22:13:56 +01:00
2026-03-12 23:47:37 +01:00
# Also check if skopeo is available (required for OCI)
if result["oci_support"] and not shutil.which("skopeo"):
result["oci_support"] = False
result["error"] = "skopeo not found. Install with: apt install skopeo"
if not result["oci_support"] and not result.get("error"):
2026-03-12 23:37:21 +01:00
result["error"] = f"OCI support requires Proxmox VE 9.1+, found {result['version']}"
2026-03-12 22:13:56 +01:00
except Exception as e:
2026-03-12 23:37:21 +01:00
result["error"] = str(e)
2026-03-12 22:13:56 +01:00
return result
2026-03-12 23:37:21 +01:00
def _run_pve_cmd(cmd: List[str], timeout: int = 60) -> Tuple[int, str, str]:
"""Run a Proxmox VE command."""
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Command timed out"
except Exception as e:
return -1, "", str(e)
2026-03-12 22:13:56 +01:00
2026-03-12 21:30:44 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
# VMID Management
2026-03-12 21:30:44 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
def _get_next_vmid() -> int:
"""Get next available VMID in OCI range (9000-9999)."""
try:
# Get list of all VMIDs in use
rc, out, _ = _run_pve_cmd(["pvesh", "get", "/cluster/resources", "--type", "vm", "--output-format", "json"])
if rc != 0:
# Fallback: try pct list
rc, out, _ = _run_pve_cmd(["pct", "list"])
used_vmids = set()
if rc == 0:
for line in out.splitlines()[1:]: # Skip header
parts = line.split()
if parts:
try:
used_vmids.add(int(parts[0]))
except ValueError:
pass
else:
resources = json.loads(out)
used_vmids = {r.get("vmid") for r in resources if r.get("vmid")}
# Find first available in OCI range
for vmid in range(OCI_VMID_START, OCI_VMID_END + 1):
if vmid not in used_vmids:
return vmid
raise RuntimeError("No available VMID in OCI range (9000-9999)")
except Exception as e:
logger.error(f"Failed to get next VMID: {e}")
# Return a high number and hope for the best
return OCI_VMID_START + int(time.time()) % 1000
def _get_vmid_for_app(app_id: str) -> Optional[int]:
"""Get the VMID for an installed app."""
installed = _load_installed()
instance = installed.get("instances", {}).get(app_id)
return instance.get("vmid") if instance else None
2026-03-13 20:12:35 +01:00
def _find_alpine_template(storage: str = DEFAULT_STORAGE) -> Optional[str]:
"""Find an available Alpine LXC template."""
template_dir = "/var/lib/vz/template/cache"
# Try to get correct path from storage config
rc, out, _ = _run_pve_cmd(["pvesm", "path", f"{storage}:vztmpl/test"])
if rc == 0 and out.strip():
template_dir = os.path.dirname(out.strip())
# Look for Alpine templates
try:
templates = os.listdir(template_dir)
alpine_templates = [t for t in templates if t.startswith("alpine-") and t.endswith((".tar.xz", ".tar.gz", ".tar"))]
if alpine_templates:
# Sort to get latest version
alpine_templates.sort(reverse=True)
return f"{storage}:vztmpl/{alpine_templates[0]}"
except Exception as e:
logger.error(f"Failed to find Alpine template: {e}")
return None
def _install_packages_in_lxc(vmid: int, packages: List[str], install_method: str = "apk") -> bool:
"""Install packages inside an LXC container."""
if not packages:
return True
print(f"[*] Installing packages: {', '.join(packages)}")
if install_method == "apk":
# Alpine Linux
cmd = ["pct", "exec", str(vmid), "--", "apk", "add"] + packages
elif install_method == "apt":
# Debian/Ubuntu
_run_pve_cmd(["pct", "exec", str(vmid), "--", "apt-get", "update"], timeout=120)
cmd = ["pct", "exec", str(vmid), "--", "apt-get", "install", "-y"] + packages
else:
logger.error(f"Unknown install method: {install_method}")
return False
rc, out, err = _run_pve_cmd(cmd, timeout=300)
if rc != 0:
logger.error(f"Failed to install packages: {err}")
return False
print(f"[OK] Packages installed")
return True
def _enable_services_in_lxc(vmid: int, services: List[str], install_method: str = "apk") -> bool:
"""Enable and start services inside an LXC container."""
if not services:
return True
print(f"[*] Enabling services: {', '.join(services)}")
for service in services:
if install_method == "apk":
# Alpine uses OpenRC
_run_pve_cmd(["pct", "exec", str(vmid), "--", "rc-update", "add", service], timeout=30)
_run_pve_cmd(["pct", "exec", str(vmid), "--", "service", service, "start"], timeout=30)
else:
# Debian/Ubuntu uses systemd
_run_pve_cmd(["pct", "exec", str(vmid), "--", "systemctl", "enable", service], timeout=30)
_run_pve_cmd(["pct", "exec", str(vmid), "--", "systemctl", "start", service], timeout=30)
print(f"[OK] Services enabled")
return True
def _configure_tailscale(vmid: int, config: Dict[str, Any]) -> bool:
"""Configure Tailscale inside the container."""
auth_key = config.get("auth_key", "")
if not auth_key:
logger.warning("No auth_key provided for Tailscale")
return False
print(f"[*] Configuring Tailscale...")
# Build tailscale up command
ts_cmd = ["tailscale", "up", f"--authkey={auth_key}"]
hostname = config.get("hostname")
if hostname:
ts_cmd.append(f"--hostname={hostname}")
advertise_routes = config.get("advertise_routes")
if advertise_routes:
if isinstance(advertise_routes, list):
advertise_routes = ",".join(advertise_routes)
ts_cmd.append(f"--advertise-routes={advertise_routes}")
if config.get("exit_node"):
ts_cmd.append("--advertise-exit-node")
if config.get("accept_routes"):
ts_cmd.append("--accept-routes")
# Run tailscale up
rc, out, err = _run_pve_cmd(["pct", "exec", str(vmid), "--"] + ts_cmd, timeout=60)
if rc != 0:
logger.error(f"Tailscale configuration failed: {err}")
print(f"[!] Tailscale error: {err}")
return False
print(f"[OK] Tailscale configured")
return True
2026-03-12 23:37:21 +01:00
# =================================================================
# OCI Image Management
# =================================================================
def pull_oci_image(image: str, tag: str = "latest", storage: str = DEFAULT_STORAGE) -> Dict[str, Any]:
2026-03-12 21:30:44 +01:00
"""
2026-03-12 23:37:21 +01:00
Pull an OCI image from a registry and store as LXC template.
2026-03-13 00:13:52 +01:00
Uses Proxmox's pvesh API to download OCI images (same as GUI).
2026-03-12 23:37:21 +01:00
Args:
image: Image name (e.g., "docker.io/tailscale/tailscale")
tag: Image tag (e.g., "stable")
storage: Proxmox storage to save template
Returns:
Dict with success status and template path
2026-03-12 21:30:44 +01:00
"""
result = {
2026-03-12 23:37:21 +01:00
"success": False,
"message": "",
"template": None
2026-03-12 21:30:44 +01:00
}
2026-03-12 23:37:21 +01:00
# Check Proxmox OCI support
pve_info = check_proxmox_version()
if not pve_info["oci_support"]:
result["message"] = pve_info.get("error", "OCI not supported")
return result
2026-03-12 21:30:44 +01:00
2026-03-13 00:13:52 +01:00
# Normalize image name - ensure full registry path
2026-03-12 23:47:37 +01:00
if not image.startswith(("docker.io/", "ghcr.io/", "quay.io/", "registry.")):
2026-03-12 23:37:21 +01:00
image = f"docker.io/{image}"
2026-03-13 00:13:52 +01:00
# For docker.io, library images need explicit library/ prefix
parts = image.split("/")
if parts[0] == "docker.io" and len(parts) == 2:
image = f"docker.io/library/{parts[1]}"
2026-03-12 23:47:37 +01:00
full_ref = f"{image}:{tag}"
2026-03-12 23:37:21 +01:00
2026-03-12 23:47:37 +01:00
logger.info(f"Pulling OCI image: {full_ref}")
print(f"[*] Pulling OCI image: {full_ref}")
2026-03-12 23:37:21 +01:00
2026-03-13 00:13:52 +01:00
# Create a safe filename from the image reference
2026-03-13 19:19:32 +01:00
# e.g., docker.io/tailscale/tailscale:stable -> tailscale-tailscale-stable.tar
# Note: Use .tar extension (not .tar.zst) - skopeo creates uncompressed tar
filename = image.replace("docker.io/", "").replace("ghcr.io/", "").replace("library/", "").replace("/", "-")
filename = f"{filename}-{tag}.tar"
2026-03-12 23:37:21 +01:00
2026-03-13 00:13:52 +01:00
# Get hostname for API
hostname = os.uname().nodename
2026-03-12 23:37:21 +01:00
2026-03-13 00:13:52 +01:00
# Use Proxmox's pvesh API to download the OCI image
# This is exactly what the GUI does
rc, out, err = _run_pve_cmd([
"pvesh", "create",
f"/nodes/{hostname}/storage/{storage}/download-url",
"--content", "vztmpl",
"--filename", filename,
"--url", f"docker://{full_ref}"
], timeout=600)
2026-03-12 23:37:21 +01:00
2026-03-13 00:13:52 +01:00
if rc != 0:
# Fallback: try direct skopeo if pvesh API fails
logger.warning(f"pvesh download failed: {err}, trying skopeo fallback")
2026-03-12 23:47:37 +01:00
2026-03-13 00:13:52 +01:00
if not shutil.which("skopeo"):
result["message"] = f"Failed to pull image via API: {err}"
2026-03-12 23:47:37 +01:00
return result
2026-03-13 00:13:52 +01:00
# Get template directory
template_dir = "/var/lib/vz/template/cache"
rc2, out2, _ = _run_pve_cmd(["pvesm", "path", f"{storage}:vztmpl/test"])
if rc2 == 0 and out2.strip():
template_dir = os.path.dirname(out2.strip())
2026-03-13 19:19:32 +01:00
template_path = os.path.join(template_dir, filename)
2026-03-13 00:13:52 +01:00
2026-03-13 20:12:35 +01:00
# Use skopeo with docker-archive format (works with multi-layer images in Proxmox 9.1)
# Note: oci-archive fails with multi-layer images, docker-archive works
2026-03-13 00:13:52 +01:00
try:
proc = subprocess.run(
2026-03-13 20:12:35 +01:00
["skopeo", "copy", "--override-os", "linux", "--override-arch", "amd64",
f"docker://{full_ref}", f"docker-archive:{template_path}"],
2026-03-13 00:13:52 +01:00
capture_output=True,
text=True,
timeout=600
)
2026-03-12 23:47:37 +01:00
2026-03-13 00:13:52 +01:00
if proc.returncode != 0:
result["message"] = f"Failed to pull image: {proc.stderr}"
logger.error(f"skopeo copy failed: {proc.stderr}")
return result
except subprocess.TimeoutExpired:
result["message"] = "Image pull timed out after 10 minutes"
return result
except Exception as e:
result["message"] = f"Failed to pull image: {e}"
logger.error(f"Pull failed: {e}")
return result
2026-03-12 23:37:21 +01:00
2026-03-13 00:13:52 +01:00
# Template was created via API or skopeo
2026-03-12 23:37:21 +01:00
result["success"] = True
2026-03-12 23:47:37 +01:00
result["template"] = f"{storage}:vztmpl/{filename}"
2026-03-12 23:37:21 +01:00
result["message"] = "Image pulled successfully"
print(f"[OK] Image pulled: {result['template']}")
return result
2026-03-12 21:30:44 +01:00
# =================================================================
# Catalog Management
# =================================================================
def load_catalog() -> Dict[str, Any]:
"""Load the OCI app catalog."""
ensure_oci_directories()
2026-03-12 23:37:21 +01:00
for path in [CATALOG_FILE, SCRIPTS_CATALOG, DEV_SCRIPTS_CATALOG]:
if os.path.exists(path):
2026-03-12 22:50:20 +01:00
try:
2026-03-12 23:37:21 +01:00
with open(path, 'r') as f:
return json.load(f)
2026-03-12 22:50:20 +01:00
except Exception as e:
2026-03-12 23:37:21 +01:00
logger.error(f"Failed to load catalog from {path}: {e}")
2026-03-12 22:50:20 +01:00
return {"version": "1.0.0", "apps": {}}
2026-03-12 21:30:44 +01:00
def get_app_definition(app_id: str) -> Optional[Dict[str, Any]]:
"""Get the definition for a specific app."""
catalog = load_catalog()
return catalog.get("apps", {}).get(app_id)
def list_available_apps() -> List[Dict[str, Any]]:
"""List all available apps from the catalog."""
catalog = load_catalog()
apps = []
for app_id, app_def in catalog.get("apps", {}).items():
apps.append({
"id": app_id,
"name": app_def.get("name", app_id),
"short_name": app_def.get("short_name", app_def.get("name", app_id)),
"category": app_def.get("category", "uncategorized"),
"subcategory": app_def.get("subcategory", ""),
"icon": app_def.get("icon", "box"),
"color": app_def.get("color", "#6366F1"),
"summary": app_def.get("summary", ""),
"installed": is_installed(app_id)
})
return apps
# =================================================================
# Installed Apps Management
# =================================================================
def _load_installed() -> Dict[str, Any]:
"""Load the installed apps registry."""
ensure_oci_directories()
if not os.path.exists(INSTALLED_FILE):
return {"version": "1.0.0", "instances": {}}
try:
with open(INSTALLED_FILE, 'r') as f:
return json.load(f)
2026-03-12 23:37:21 +01:00
except Exception:
2026-03-12 21:30:44 +01:00
return {"version": "1.0.0", "instances": {}}
def _save_installed(data: Dict[str, Any]) -> bool:
"""Save the installed apps registry."""
try:
with open(INSTALLED_FILE, 'w') as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
2026-03-12 23:37:21 +01:00
logger.error(f"Failed to save installed: {e}")
2026-03-12 21:30:44 +01:00
return False
def is_installed(app_id: str) -> bool:
"""Check if an app is installed."""
installed = _load_installed()
return app_id in installed.get("instances", {})
def list_installed_apps() -> List[Dict[str, Any]]:
"""List all installed apps with their status."""
installed = _load_installed()
apps = []
for app_id, instance in installed.get("instances", {}).items():
status = get_app_status(app_id)
apps.append({
"id": app_id,
"instance_name": instance.get("instance_name", app_id),
2026-03-12 23:37:21 +01:00
"vmid": instance.get("vmid"),
2026-03-12 21:30:44 +01:00
"installed_at": instance.get("installed_at"),
"installed_by": instance.get("installed_by", "unknown"),
"status": status
})
return apps
def get_installed_app(app_id: str) -> Optional[Dict[str, Any]]:
"""Get details of an installed app."""
installed = _load_installed()
instance = installed.get("instances", {}).get(app_id)
if not instance:
return None
instance["status"] = get_app_status(app_id)
return instance
# =================================================================
# Container Status
# =================================================================
def get_app_status(app_id: str) -> Dict[str, Any]:
2026-03-12 23:37:21 +01:00
"""Get the current status of an app's LXC container."""
2026-03-12 21:30:44 +01:00
result = {
"state": "not_installed",
"health": "unknown",
"uptime_seconds": 0,
"last_check": datetime.now().isoformat()
}
if not is_installed(app_id):
return result
2026-03-12 23:37:21 +01:00
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["state"] = "error"
result["health"] = "unknown"
return result
# Get LXC status using pct
rc, out, _ = _run_pve_cmd(["pct", "status", str(vmid)])
2026-03-12 21:30:44 +01:00
if rc != 0:
result["state"] = "error"
result["health"] = "unhealthy"
return result
2026-03-12 23:37:21 +01:00
# Parse status: "status: running" or "status: stopped"
if "running" in out.lower():
result["state"] = "running"
result["health"] = "healthy"
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Get uptime
rc, out, _ = _run_pve_cmd(["pct", "exec", str(vmid), "--", "cat", "/proc/uptime"])
if rc == 0:
2026-03-12 21:30:44 +01:00
try:
2026-03-12 23:37:21 +01:00
uptime = float(out.split()[0])
result["uptime_seconds"] = int(uptime)
2026-03-12 21:30:44 +01:00
except:
pass
2026-03-12 23:37:21 +01:00
elif "stopped" in out.lower():
result["state"] = "stopped"
result["health"] = "stopped"
else:
result["state"] = "unknown"
2026-03-12 21:30:44 +01:00
return result
# =================================================================
# Deployment
# =================================================================
def deploy_app(app_id: str, config: Dict[str, Any], installed_by: str = "web") -> Dict[str, Any]:
"""
2026-03-12 23:37:21 +01:00
Deploy an OCI app as a Proxmox LXC container.
2026-03-12 21:30:44 +01:00
Args:
app_id: ID of the app from the catalog
config: User configuration values
installed_by: Source of installation ('web' or 'cli')
Returns:
Dict with success status and details
"""
result = {
"success": False,
"message": "",
2026-03-12 23:37:21 +01:00
"app_id": app_id,
"vmid": None
2026-03-12 21:30:44 +01:00
}
2026-03-12 23:37:21 +01:00
# Check Proxmox OCI support
pve_info = check_proxmox_version()
if not pve_info["oci_support"]:
result["message"] = pve_info.get("error", "OCI containers require Proxmox VE 9.1+")
2026-03-12 21:30:44 +01:00
return result
# Get app definition
app_def = get_app_definition(app_id)
if not app_def:
2026-03-12 23:37:21 +01:00
result["message"] = f"App '{app_id}' not found in catalog"
2026-03-12 21:30:44 +01:00
return result
# Check if already installed
if is_installed(app_id):
result["message"] = f"App '{app_id}' is already installed"
return result
container_def = app_def.get("container", {})
2026-03-13 20:12:35 +01:00
container_type = container_def.get("type", "oci")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Get next available VMID
vmid = _get_next_vmid()
result["vmid"] = vmid
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
hostname = config.get("hostname", f"proxmenux-{app_id}")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
logger.info(f"Deploying {app_id} as LXC {vmid}")
print(f"[*] Deploying {app_id} as LXC container (VMID: {vmid})")
2026-03-12 21:30:44 +01:00
2026-03-13 20:12:35 +01:00
# Determine deployment method: LXC traditional or OCI image
if container_type == "lxc":
# Use traditional LXC with Alpine template + package installation
# This is more reliable than OCI images which have bugs in Proxmox 9.1
template = _find_alpine_template()
if not template:
result["message"] = "Alpine template not found. Please download it from CT Templates."
return result
print(f"[*] Using LXC template: {template}")
use_oci = False
else:
# OCI image deployment (may have issues with multi-layer images)
image = container_def.get("image", "")
if not image:
result["message"] = "No container image specified in app definition"
return result
if ":" in image:
image_name, tag = image.rsplit(":", 1)
else:
image_name, tag = image, "latest"
print(f"[*] Pulling OCI image: {image}")
pull_result = pull_oci_image(image_name, tag)
if not pull_result["success"]:
result["message"] = pull_result["message"]
return result
template = pull_result["template"]
use_oci = True
2026-03-12 23:37:21 +01:00
# Step 2: Create LXC container
print(f"[*] Creating LXC container...")
pct_cmd = [
"pct", "create", str(vmid), template,
"--hostname", hostname,
"--memory", str(container_def.get("memory", 512)),
"--cores", str(container_def.get("cores", 1)),
"--rootfs", f"local-lvm:{container_def.get('disk_size', 4)}",
"--unprivileged", "0" if container_def.get("privileged") else "1",
"--onboot", "1"
]
2026-03-12 21:30:44 +01:00
2026-03-13 20:12:35 +01:00
# Add ostype for OCI containers
if use_oci:
pct_cmd.extend(["--ostype", "unmanaged"])
# Add features (nesting, etc.)
features = container_def.get("features", [])
if features:
pct_cmd.extend(["--features", ",".join(features)])
2026-03-13 00:13:52 +01:00
# Network configuration - use simple bridge with DHCP
pct_cmd.extend(["--net0", "name=eth0,bridge=vmbr0,ip=dhcp"])
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Run pct create
rc, out, err = _run_pve_cmd(pct_cmd, timeout=120)
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
if rc != 0:
result["message"] = f"Failed to create container: {err}"
logger.error(f"pct create failed: {err}")
return result
2026-03-13 00:13:52 +01:00
# Step 3: Apply extra LXC configuration (for unprivileged containers)
lxc_config = container_def.get("lxc_config", [])
if lxc_config:
conf_file = f"/etc/pve/lxc/{vmid}.conf"
try:
with open(conf_file, 'a') as f:
f.write("\n# ProxMenux OCI extra config\n")
for config_line in lxc_config:
f.write(f"{config_line}\n")
logger.info(f"Applied extra LXC config to {conf_file}")
except Exception as e:
logger.warning(f"Could not apply extra LXC config: {e}")
2026-03-13 20:12:35 +01:00
# Step 4: Configure environment variables (only for OCI containers)
if use_oci:
env_vars = []
for env in container_def.get("environment", []):
env_name = env.get("name", "")
env_value = env.get("value", "")
if env_value.startswith("$"):
config_key = env_value[1:]
env_value = config.get(config_key, env.get("default", ""))
if env_name and env_value:
env_vars.append(f"{env_name}={env_value}")
2026-03-12 21:30:44 +01:00
2026-03-13 20:12:35 +01:00
if env_vars:
for env in env_vars:
_run_pve_cmd(["pct", "set", str(vmid), f"--lxc.environment", env])
2026-03-12 21:30:44 +01:00
2026-03-13 00:13:52 +01:00
# Step 5: Enable IP forwarding if needed (for VPN containers)
2026-03-13 20:12:35 +01:00
if container_def.get("requires_ip_forward"):
2026-03-12 23:37:21 +01:00
_enable_host_ip_forwarding()
2026-03-13 00:13:52 +01:00
# Step 6: Start the container
2026-03-12 23:37:21 +01:00
print(f"[*] Starting container...")
rc, _, err = _run_pve_cmd(["pct", "start", str(vmid)])
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
if rc != 0:
result["message"] = f"Container created but failed to start: {err}"
logger.error(f"pct start failed: {err}")
2026-03-13 20:12:35 +01:00
return result
# Step 7: For LXC containers, install packages and configure
if not use_oci:
packages = container_def.get("packages", [])
install_method = container_def.get("install_method", "apk")
if packages:
if not _install_packages_in_lxc(vmid, packages, install_method):
result["message"] = "Container created but package installation failed"
return result
services = container_def.get("services", [])
if services:
_enable_services_in_lxc(vmid, services, install_method)
# Special handling for Tailscale
if "tailscale" in packages:
_configure_tailscale(vmid, config)
2026-03-12 21:30:44 +01:00
2026-03-13 20:12:35 +01:00
# Step 8: Save instance data
2026-03-12 23:37:21 +01:00
instance_dir = os.path.join(INSTANCES_DIR, app_id)
os.makedirs(instance_dir, exist_ok=True)
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
config_file = os.path.join(instance_dir, "config.json")
config_schema = app_def.get("config_schema", {})
encrypted_config = encrypt_config_sensitive_fields(config, config_schema)
with open(config_file, 'w') as f:
json.dump({
"app_id": app_id,
"vmid": vmid,
"created_at": datetime.now().isoformat(),
"values": encrypted_config
}, f, indent=2)
os.chmod(config_file, 0o600)
# Update installed registry
2026-03-12 21:30:44 +01:00
installed = _load_installed()
installed["instances"][app_id] = {
"app_id": app_id,
2026-03-12 23:37:21 +01:00
"vmid": vmid,
"instance_name": hostname,
2026-03-12 21:30:44 +01:00
"installed_at": datetime.now().isoformat(),
"installed_by": installed_by,
2026-03-13 20:12:35 +01:00
"template": template,
"container_type": container_type
2026-03-12 21:30:44 +01:00
}
2026-03-12 23:37:21 +01:00
_save_installed(installed)
2026-03-12 21:30:44 +01:00
result["success"] = True
2026-03-12 23:37:21 +01:00
result["message"] = f"App deployed successfully as LXC {vmid}"
print(f"[OK] Container {vmid} ({hostname}) deployed successfully!")
2026-03-12 21:30:44 +01:00
return result
2026-03-12 23:37:21 +01:00
def _enable_host_ip_forwarding() -> bool:
"""Enable IP forwarding on the Proxmox host."""
logger.info("Enabling IP forwarding on host")
print("[*] Enabling IP forwarding...")
try:
# Enable IPv4 forwarding
with open("/proc/sys/net/ipv4/ip_forward", 'w') as f:
f.write('1')
# Enable IPv6 forwarding
ipv6_path = "/proc/sys/net/ipv6/conf/all/forwarding"
if os.path.exists(ipv6_path):
with open(ipv6_path, 'w') as f:
f.write('1')
# Make persistent
sysctl_d = "/etc/sysctl.d/99-proxmenux-ip-forward.conf"
with open(sysctl_d, 'w') as f:
f.write("net.ipv4.ip_forward = 1\n")
f.write("net.ipv6.conf.all.forwarding = 1\n")
subprocess.run(["sysctl", "-p", sysctl_d], capture_output=True)
return True
except Exception as e:
logger.warning(f"Could not enable IP forwarding: {e}")
return False
2026-03-12 21:30:44 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
# Container Control
2026-03-12 21:30:44 +01:00
# =================================================================
def start_app(app_id: str) -> Dict[str, Any]:
2026-03-12 23:37:21 +01:00
"""Start an app's LXC container."""
result = {"success": False, "message": ""}
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App '{app_id}' not found"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
rc, _, err = _run_pve_cmd(["pct", "start", str(vmid)])
if rc == 0:
result["success"] = True
result["message"] = f"Container {vmid} started"
else:
result["message"] = f"Failed to start: {err}"
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
return result
2026-03-12 21:30:44 +01:00
def stop_app(app_id: str) -> Dict[str, Any]:
2026-03-12 23:37:21 +01:00
"""Stop an app's LXC container."""
result = {"success": False, "message": ""}
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App '{app_id}' not found"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
rc, _, err = _run_pve_cmd(["pct", "stop", str(vmid)])
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
if rc == 0:
result["success"] = True
result["message"] = f"Container {vmid} stopped"
else:
result["message"] = f"Failed to stop: {err}"
return result
2026-03-12 21:30:44 +01:00
def restart_app(app_id: str) -> Dict[str, Any]:
2026-03-12 23:37:21 +01:00
"""Restart an app's LXC container."""
result = {"success": False, "message": ""}
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App '{app_id}' not found"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
rc, _, err = _run_pve_cmd(["pct", "reboot", str(vmid)])
if rc == 0:
result["success"] = True
result["message"] = f"Container {vmid} restarted"
else:
result["message"] = f"Failed to restart: {err}"
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
def remove_app(app_id: str, remove_data: bool = True) -> Dict[str, Any]:
"""Remove an app's LXC container."""
result = {"success": False, "message": ""}
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App '{app_id}' not found"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
logger.info(f"Removing app {app_id} (VMID: {vmid})")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Stop if running
_run_pve_cmd(["pct", "stop", str(vmid)])
time.sleep(2)
# Destroy container
rc, _, err = _run_pve_cmd(["pct", "destroy", str(vmid), "--purge"])
2026-03-12 21:30:44 +01:00
if rc != 0:
2026-03-12 23:37:21 +01:00
result["message"] = f"Failed to destroy container: {err}"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Remove from installed registry
2026-03-12 21:30:44 +01:00
installed = _load_installed()
if app_id in installed.get("instances", {}):
del installed["instances"][app_id]
_save_installed(installed)
2026-03-12 23:37:21 +01:00
# Remove instance data
2026-03-12 21:30:44 +01:00
if remove_data:
instance_dir = os.path.join(INSTANCES_DIR, app_id)
if os.path.exists(instance_dir):
2026-03-12 23:37:21 +01:00
shutil.rmtree(instance_dir)
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
result["success"] = True
result["message"] = f"App {app_id} removed successfully"
return result
2026-03-12 21:30:44 +01:00
def get_app_logs(app_id: str, lines: int = 100) -> Dict[str, Any]:
2026-03-12 23:37:21 +01:00
"""Get logs from an app's LXC container."""
result = {"success": False, "message": "", "logs": ""}
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App '{app_id}' not found"
return result
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Try to get logs from inside container
rc, out, err = _run_pve_cmd([
"pct", "exec", str(vmid), "--",
"tail", "-n", str(lines), "/var/log/messages"
])
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
if rc != 0:
# Try journalctl
rc, out, err = _run_pve_cmd([
"pct", "exec", str(vmid), "--",
"journalctl", "-n", str(lines), "--no-pager"
])
if rc == 0:
result["success"] = True
result["logs"] = out
else:
result["message"] = f"Failed to get logs: {err}"
result["logs"] = err
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
return result
2026-03-12 21:30:44 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
# Network Detection
2026-03-12 21:30:44 +01:00
# =================================================================
2026-03-12 23:37:21 +01:00
def detect_host_networks() -> List[Dict[str, Any]]:
"""Detect available networks on the Proxmox host."""
networks = []
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
try:
# Get bridges from Proxmox
rc, out, _ = _run_pve_cmd(["pvesh", "get", "/nodes/localhost/network", "--output-format", "json"])
if rc == 0:
ifaces = json.loads(out)
for iface in ifaces:
if iface.get("type") == "bridge":
networks.append({
"interface": iface.get("iface", ""),
"type": "bridge",
"address": iface.get("address", ""),
"cidr": iface.get("cidr", ""),
"recommended": True
})
except Exception as e:
logger.error(f"Failed to detect networks: {e}")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
return networks
2026-03-13 00:13:52 +01:00
# =================================================================
# Network Detection
# =================================================================
def detect_networks() -> List[Dict[str, str]]:
"""
Detect available network interfaces and their subnets.
Used for suggesting routes to advertise via Tailscale.
Returns:
List of dicts with interface name and subnet.
"""
networks = []
try:
# Use ip command to get interfaces and their addresses
proc = subprocess.run(
["ip", "-j", "addr", "show"],
capture_output=True,
text=True
)
if proc.returncode == 0:
interfaces = json.loads(proc.stdout)
for iface in interfaces:
name = iface.get("ifname", "")
# Skip loopback and virtual interfaces
if name in ("lo", "docker0") or name.startswith(("veth", "br-", "tap", "fwbr", "fwpr")):
continue
# Get IPv4 addresses
for addr_info in iface.get("addr_info", []):
if addr_info.get("family") == "inet":
ip = addr_info.get("local", "")
prefix = addr_info.get("prefixlen", 24)
if ip and not ip.startswith("127."):
# Calculate network address
ip_parts = ip.split(".")
if len(ip_parts) == 4:
# Simple network calculation
if prefix >= 24:
network = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.0/{prefix}"
elif prefix >= 16:
network = f"{ip_parts[0]}.{ip_parts[1]}.0.0/{prefix}"
else:
network = f"{ip_parts[0]}.0.0.0/{prefix}"
networks.append({
"interface": name,
"subnet": network,
"ip": ip
})
except Exception as e:
logger.error(f"Failed to detect networks: {e}")
return networks
2026-03-14 20:30:53 +01:00
# =================================================================
# Update Auth Key (for Tailscale re-authentication)
# =================================================================
def update_auth_key(app_id: str, auth_key: str) -> Dict[str, Any]:
"""Update the Tailscale auth key for a running gateway."""
result = {"success": False, "message": "", "app_id": app_id}
# Get VMID for the app
vmid = _get_vmid_for_app(app_id)
if not vmid:
result["message"] = f"App {app_id} not found or not installed"
return result
# Check if container is running
status = get_app_status(app_id)
if status.get("state") != "running":
result["message"] = "Container must be running to update auth key"
return result
logger.info(f"Updating auth key for {app_id} (VMID: {vmid})")
print(f"[*] Updating auth key for {app_id}...")
# Run tailscale logout first to clear existing state
print(f"[*] Logging out of Tailscale...")
_run_pve_cmd(["pct", "exec", str(vmid), "--", "tailscale", "logout"], timeout=30)
# Wait a moment for logout to complete
import time
time.sleep(2)
# Run tailscale up with new auth key
print(f"[*] Authenticating with new key...")
# Load saved config to get original settings
config_file = os.path.join(INSTANCES_DIR, app_id, "config.json")
config = {}
if os.path.exists(config_file):
try:
with open(config_file) as f:
saved_config = json.load(f)
config = saved_config.get("values", {})
except:
pass
# Build tailscale up command
ts_cmd = ["tailscale", "up", f"--authkey={auth_key}"]
hostname = config.get("hostname")
if hostname:
ts_cmd.append(f"--hostname={hostname}")
advertise_routes = config.get("advertise_routes")
if advertise_routes:
if isinstance(advertise_routes, list):
advertise_routes = ",".join(advertise_routes)
ts_cmd.append(f"--advertise-routes={advertise_routes}")
if config.get("exit_node"):
ts_cmd.append("--advertise-exit-node")
if config.get("accept_routes"):
ts_cmd.append("--accept-routes")
rc, out, err = _run_pve_cmd(["pct", "exec", str(vmid), "--"] + ts_cmd, timeout=60)
if rc != 0:
logger.error(f"Failed to update auth key: {err}")
result["message"] = f"Failed to authenticate: {err}"
return result
print(f"[OK] Auth key updated successfully")
result["success"] = True
result["message"] = "Auth key updated successfully"
return result
2026-03-12 23:37:21 +01:00
# =================================================================
# Runtime Detection (for backward compatibility)
# =================================================================
def detect_runtime() -> Dict[str, Any]:
"""Check if Proxmox OCI support is available."""
pve_info = check_proxmox_version()
return {
"available": pve_info["oci_support"],
"runtime": "proxmox-lxc" if pve_info["oci_support"] else None,
"version": pve_info.get("version"),
"path": shutil.which("pct"),
"error": pve_info.get("error")
}
2026-03-12 21:30:44 +01:00
# =================================================================
# CLI Interface
# =================================================================
2026-03-12 23:37:21 +01:00
if __name__ == "__main__":
2026-03-12 21:30:44 +01:00
import argparse
parser = argparse.ArgumentParser(description="ProxMenux OCI Manager")
2026-03-12 23:37:21 +01:00
subparsers = parser.add_subparsers(dest="command", help="Commands")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Deploy command
2026-03-12 21:30:44 +01:00
deploy_parser = subparsers.add_parser("deploy", help="Deploy an app")
2026-03-12 23:37:21 +01:00
deploy_parser.add_argument("--app-id", required=True, help="App ID")
deploy_parser.add_argument("--config", default="{}", help="JSON config")
# Status command
status_parser = subparsers.add_parser("status", help="Get app status")
status_parser.add_argument("--app-id", required=True, help="App ID")
2026-03-12 21:30:44 +01:00
2026-03-12 23:37:21 +01:00
# Control commands
start_parser = subparsers.add_parser("start", help="Start app")
2026-03-12 21:30:44 +01:00
start_parser.add_argument("--app-id", required=True)
2026-03-12 23:37:21 +01:00
stop_parser = subparsers.add_parser("stop", help="Stop app")
2026-03-12 21:30:44 +01:00
stop_parser.add_argument("--app-id", required=True)
2026-03-12 23:37:21 +01:00
remove_parser = subparsers.add_parser("remove", help="Remove app")
2026-03-12 21:30:44 +01:00
remove_parser.add_argument("--app-id", required=True)
2026-03-12 23:37:21 +01:00
# List commands
subparsers.add_parser("list", help="List available apps")
subparsers.add_parser("installed", help="List installed apps")
subparsers.add_parser("runtime", help="Check runtime")
2026-03-12 21:30:44 +01:00
args = parser.parse_args()
if args.command == "deploy":
config = json.loads(args.config)
2026-03-12 23:37:21 +01:00
result = deploy_app(args.app_id, config, "cli")
print(json.dumps(result, indent=2))
elif args.command == "status":
print(json.dumps(get_app_status(args.app_id), indent=2))
2026-03-12 21:30:44 +01:00
elif args.command == "start":
2026-03-12 23:37:21 +01:00
print(json.dumps(start_app(args.app_id), indent=2))
2026-03-12 21:30:44 +01:00
elif args.command == "stop":
2026-03-12 23:37:21 +01:00
print(json.dumps(stop_app(args.app_id), indent=2))
2026-03-12 21:30:44 +01:00
elif args.command == "remove":
2026-03-12 23:37:21 +01:00
print(json.dumps(remove_app(args.app_id), indent=2))
2026-03-12 21:30:44 +01:00
elif args.command == "list":
2026-03-12 23:37:21 +01:00
print(json.dumps(list_available_apps(), indent=2))
elif args.command == "installed":
print(json.dumps(list_installed_apps(), indent=2))
2026-03-12 21:30:44 +01:00
elif args.command == "runtime":
2026-03-12 23:37:21 +01:00
print(json.dumps(detect_runtime(), indent=2))
2026-03-12 21:30:44 +01:00
else:
parser.print_help()