Files
ProxMenux/AppImage/scripts/flask_notification_routes.py

713 lines
26 KiB
Python
Raw Normal View History

2026-02-18 17:24:26 +01:00
"""
Flask routes for notification service configuration and management.
Blueprint pattern matching flask_health_routes.py / flask_security_routes.py.
"""
2026-02-19 17:02:02 +01:00
import hmac
import time
import hashlib
from collections import deque
2026-02-18 17:24:26 +01:00
from flask import Blueprint, jsonify, request
from notification_manager import notification_manager
2026-02-19 17:02:02 +01:00
# ─── Webhook Hardening Helpers ───────────────────────────────────
class WebhookRateLimiter:
"""Simple sliding-window rate limiter for the webhook endpoint."""
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self._max = max_requests
self._window = window_seconds
self._timestamps: deque = deque()
def allow(self) -> bool:
now = time.time()
# Prune entries outside the window
while self._timestamps and now - self._timestamps[0] > self._window:
self._timestamps.popleft()
if len(self._timestamps) >= self._max:
return False
self._timestamps.append(now)
return True
class ReplayCache:
"""Bounded in-memory cache of recently seen request signatures (60s TTL)."""
_MAX_SIZE = 2000 # Hard cap to prevent memory growth
def __init__(self, ttl: int = 60):
self._ttl = ttl
self._seen: dict = {} # signature -> timestamp
def check_and_record(self, signature: str) -> bool:
"""Return True if this signature was already seen (replay). Records it otherwise."""
now = time.time()
# Periodic cleanup
if len(self._seen) > self._MAX_SIZE // 2:
cutoff = now - self._ttl
self._seen = {k: v for k, v in self._seen.items() if v > cutoff}
if signature in self._seen and now - self._seen[signature] < self._ttl:
return True # Replay detected
self._seen[signature] = now
return False
# Module-level singletons (one per process)
_webhook_limiter = WebhookRateLimiter(max_requests=60, window_seconds=60)
_replay_cache = ReplayCache(ttl=60)
# Timestamp validation window (seconds)
_TIMESTAMP_MAX_DRIFT = 60
2026-02-18 17:24:26 +01:00
notification_bp = Blueprint('notifications', __name__)
@notification_bp.route('/api/notifications/settings', methods=['GET'])
def get_notification_settings():
"""Get all notification settings for the UI."""
try:
settings = notification_manager.get_settings()
return jsonify(settings)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/settings', methods=['POST'])
def save_notification_settings():
"""Save notification settings from the UI."""
try:
payload = request.get_json()
if not payload:
return jsonify({'error': 'No data provided'}), 400
result = notification_manager.save_settings(payload)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/test', methods=['POST'])
def test_notification():
"""Send a test notification to one or all channels."""
try:
data = request.get_json() or {}
channel = data.get('channel', 'all')
result = notification_manager.test_channel(channel)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/status', methods=['GET'])
def get_notification_status():
"""Get notification service status."""
try:
status = notification_manager.get_status()
return jsonify(status)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/history', methods=['GET'])
def get_notification_history():
"""Get notification history with optional filters."""
try:
2026-02-21 18:47:15 +01:00
limit = request.args.get('limit', 100, type=int)
2026-02-18 17:24:26 +01:00
offset = request.args.get('offset', 0, type=int)
severity = request.args.get('severity', '')
channel = request.args.get('channel', '')
result = notification_manager.get_history(limit, offset, severity, channel)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/history', methods=['DELETE'])
def clear_notification_history():
"""Clear all notification history."""
try:
result = notification_manager.clear_history()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/send', methods=['POST'])
def send_notification():
"""Send a notification via API (for testing or external triggers)."""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
result = notification_manager.send_notification(
event_type=data.get('event_type', 'custom'),
severity=data.get('severity', 'INFO'),
title=data.get('title', ''),
message=data.get('message', ''),
data=data.get('data', {}),
source='api'
)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
2026-02-19 17:02:02 +01:00
2026-02-19 19:56:20 +01:00
# ── PVE config constants ──
_PVE_ENDPOINT_ID = 'proxmenux-webhook'
_PVE_MATCHER_ID = 'proxmenux-default'
_PVE_WEBHOOK_URL = 'http://127.0.0.1:8008/api/notifications/webhook'
_PVE_NOTIFICATIONS_CFG = '/etc/pve/notifications.cfg'
_PVE_PRIV_CFG = '/etc/pve/priv/notifications.cfg'
_PVE_OUR_HEADERS = {
f'webhook: {_PVE_ENDPOINT_ID}',
f'matcher: {_PVE_MATCHER_ID}',
}
def _pve_read_file(path):
"""Read file, return (content, error). Content is '' if missing."""
try:
with open(path, 'r') as f:
return f.read(), None
except FileNotFoundError:
return '', None
except PermissionError:
return None, f'Permission denied reading {path}'
except Exception as e:
return None, str(e)
def _pve_backup_file(path):
"""Create timestamped backup if file exists. Never fails fatally."""
import os, shutil
from datetime import datetime
try:
if os.path.exists(path):
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
backup = f"{path}.proxmenux_backup_{ts}"
shutil.copy2(path, backup)
except Exception:
pass
def _pve_remove_our_blocks(text, headers_to_remove):
"""Remove only blocks whose header line matches one of ours.
Preserves ALL other content byte-for-byte.
A block = header line + indented continuation lines + trailing blank line.
"""
lines = text.splitlines(keepends=True)
cleaned = []
skip_block = False
for line in lines:
stripped = line.strip()
if stripped and not line[0:1].isspace() and ':' in stripped:
if stripped in headers_to_remove:
skip_block = True
continue
else:
skip_block = False
if skip_block:
if not stripped:
skip_block = False
continue
elif line[0:1].isspace():
continue
else:
skip_block = False
cleaned.append(line)
return ''.join(cleaned)
2026-02-21 19:56:50 +01:00
def _build_webhook_fallback():
"""Build fallback manual commands for webhook setup."""
2026-02-21 20:55:18 +01:00
body_tpl = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","type":"{{#if fields.type}}{{ fields.type }}{{else}}test{{/if}}","hostname":"{{#if fields.hostname}}{{ fields.hostname }}{{else}}unknown{{/if}}"}'
return [
"# 1. Append to END of /etc/pve/notifications.cfg",
"# (do NOT delete existing content):",
"",
f"webhook: {_PVE_ENDPOINT_ID}",
f"\tmethod post",
f"\turl {_PVE_WEBHOOK_URL}",
"",
f"matcher: {_PVE_MATCHER_ID}",
f"\ttarget {_PVE_ENDPOINT_ID}",
"\tmode all",
"",
"# 2. Append to /etc/pve/priv/notifications.cfg :",
f"webhook: {_PVE_ENDPOINT_ID}",
"",
"# 3. Set body via pvesh (NOT in the config file -- PVE stores it base64):",
f"pvesh set /cluster/notifications/endpoints/webhook/{_PVE_ENDPOINT_ID} --body '{body_tpl}'",
]
2026-02-21 19:56:50 +01:00
def setup_pve_webhook_core() -> dict:
"""Core logic to configure PVE webhook. Callable from anywhere.
2026-02-19 17:26:36 +01:00
2026-02-21 19:56:50 +01:00
Returns dict with 'configured', 'error', 'fallback_commands' keys.
2026-02-19 17:26:36 +01:00
Idempotent: safe to call multiple times.
2026-02-19 17:02:02 +01:00
"""
import secrets as secrets_mod
result = {
'configured': False,
2026-02-19 19:56:20 +01:00
'endpoint_id': _PVE_ENDPOINT_ID,
'matcher_id': _PVE_MATCHER_ID,
'url': _PVE_WEBHOOK_URL,
2026-02-19 17:02:02 +01:00
'fallback_commands': [],
'error': None,
}
try:
2026-02-19 18:37:42 +01:00
# ── Step 1: Ensure webhook secret exists (for our own internal use) ──
2026-02-19 17:02:02 +01:00
secret = notification_manager.get_webhook_secret()
if not secret:
secret = secrets_mod.token_urlsafe(32)
notification_manager._save_setting('webhook_secret', secret)
2026-02-19 18:37:42 +01:00
# ── Step 2: Read main config ──
2026-02-19 19:56:20 +01:00
cfg_text, err = _pve_read_file(_PVE_NOTIFICATIONS_CFG)
2026-02-19 17:46:50 +01:00
if err:
result['error'] = err
2026-02-21 19:56:50 +01:00
result['fallback_commands'] = _build_webhook_fallback()
return result
2026-02-19 17:02:02 +01:00
2026-02-19 18:37:42 +01:00
# ── Step 3: Read priv config (to clean up any broken blocks we wrote before) ──
2026-02-19 19:56:20 +01:00
priv_text, err = _pve_read_file(_PVE_PRIV_CFG)
2026-02-19 17:46:50 +01:00
if err:
2026-02-19 19:56:20 +01:00
priv_text = None
2026-02-19 17:26:36 +01:00
2026-02-19 18:37:42 +01:00
# ── Step 4: Create backups before ANY modification ──
2026-02-19 19:56:20 +01:00
_pve_backup_file(_PVE_NOTIFICATIONS_CFG)
2026-02-19 18:37:42 +01:00
if priv_text is not None:
2026-02-19 19:56:20 +01:00
_pve_backup_file(_PVE_PRIV_CFG)
2026-02-19 18:37:42 +01:00
# ── Step 5: Remove any previous proxmenux blocks from BOTH files ──
2026-02-19 19:56:20 +01:00
cleaned_cfg = _pve_remove_our_blocks(cfg_text, _PVE_OUR_HEADERS)
2026-02-19 18:37:42 +01:00
if priv_text is not None:
2026-02-19 19:56:20 +01:00
cleaned_priv = _pve_remove_our_blocks(priv_text, _PVE_OUR_HEADERS)
2026-02-19 17:46:50 +01:00
2026-02-19 18:37:42 +01:00
# ── Step 6: Build new blocks ──
# Exact format from a real working PVE server:
# webhook: name
# \tmethod post
# \turl http://...
#
# NO header lines -- localhost webhook doesn't need them.
# PVE header format is: header name=X-Key,value=<base64>
# PVE secret format is: secret name=key,value=<base64>
# Neither is needed for localhost calls.
2026-02-19 17:46:50 +01:00
2026-02-21 20:49:59 +01:00
# Write ONLY basic properties (method, url) to the config file.
# body and header MUST be set via pvesh API (Step 10) because PVE
# stores them base64-encoded internally. Writing them as plain text
# to the config file corrupts PVE's config parser.
2026-02-19 18:37:42 +01:00
endpoint_block = (
2026-02-19 19:56:20 +01:00
f"webhook: {_PVE_ENDPOINT_ID}\n"
2026-02-19 17:26:36 +01:00
f"\tmethod post\n"
2026-02-19 19:56:20 +01:00
f"\turl {_PVE_WEBHOOK_URL}\n"
2026-02-19 17:26:36 +01:00
)
2026-02-19 17:02:02 +01:00
2026-02-19 18:37:42 +01:00
matcher_block = (
2026-02-19 19:56:20 +01:00
f"matcher: {_PVE_MATCHER_ID}\n"
f"\ttarget {_PVE_ENDPOINT_ID}\n"
2026-02-20 17:55:05 +01:00
f"\tmode all\n"
2026-02-19 17:26:36 +01:00
)
2026-02-19 17:02:02 +01:00
2026-02-19 18:37:42 +01:00
# ── Step 7: Append our blocks to cleaned main config ──
if cleaned_cfg and not cleaned_cfg.endswith('\n'):
cleaned_cfg += '\n'
if cleaned_cfg and not cleaned_cfg.endswith('\n\n'):
cleaned_cfg += '\n'
2026-02-19 17:02:02 +01:00
2026-02-19 18:37:42 +01:00
new_cfg = cleaned_cfg + endpoint_block + '\n' + matcher_block
2026-02-19 17:02:02 +01:00
2026-02-19 18:37:42 +01:00
# ── Step 8: Write main config ──
try:
2026-02-19 19:56:20 +01:00
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
2026-02-19 18:37:42 +01:00
f.write(new_cfg)
except PermissionError:
2026-02-19 19:56:20 +01:00
result['error'] = f'Permission denied writing {_PVE_NOTIFICATIONS_CFG}'
2026-02-21 19:56:50 +01:00
result['fallback_commands'] = _build_webhook_fallback()
return result
2026-02-19 18:37:42 +01:00
except Exception as e:
try:
2026-02-19 19:56:20 +01:00
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
2026-02-19 18:37:42 +01:00
f.write(cfg_text)
except Exception:
pass
result['error'] = str(e)
2026-02-21 19:56:50 +01:00
result['fallback_commands'] = _build_webhook_fallback()
return result
2026-02-19 17:02:02 +01:00
2026-02-21 19:56:50 +01:00
# ── Step 9: Write priv config with our webhook entry ──
# PVE REQUIRES a matching block in priv/notifications.cfg for every
# webhook endpoint, even if it has no secrets. Without it PVE throws:
# "Could not instantiate endpoint: private config does not exist"
priv_block = (
f"webhook: {_PVE_ENDPOINT_ID}\n"
)
if priv_text is not None:
# Start from cleaned priv (our old blocks removed)
if cleaned_priv and not cleaned_priv.endswith('\n'):
cleaned_priv += '\n'
if cleaned_priv and not cleaned_priv.endswith('\n\n'):
cleaned_priv += '\n'
new_priv = cleaned_priv + priv_block
else:
new_priv = priv_block
try:
with open(_PVE_PRIV_CFG, 'w') as f:
f.write(new_priv)
except PermissionError:
result['error'] = f'Permission denied writing {_PVE_PRIV_CFG}'
result['fallback_commands'] = _build_webhook_fallback()
return result
except Exception:
pass
2026-02-19 18:37:42 +01:00
2026-02-21 20:33:22 +01:00
# ── Step 10: Configure body and header via pvesh API ──
2026-02-21 20:49:59 +01:00
# body and header are stored base64-encoded in the config file.
# Writing them as plain text corrupts PVE's parser. pvesh handles
# the encoding correctly.
2026-02-21 20:33:22 +01:00
import subprocess
2026-02-21 20:49:59 +01:00
pvesh_path = f'/cluster/notifications/endpoints/webhook/{_PVE_ENDPOINT_ID}'
# Body template using PVE Handlebars syntax.
# - {{ escape title }} and {{ escape message }} are PVE built-in helpers
# - {{ severity }} gives: info, notice, warning, error, unknown
# - {{ fields.X }} is populated for real events (backup, replication, etc.)
# but NOT for test notifications, so we use {{#if}} fallbacks.
# - {{ timestamp }} gives epoch seconds
body_template = (
'{'
'"title":"{{ escape title }}",'
'"message":"{{ escape message }}",'
'"severity":"{{ severity }}",'
'"timestamp":"{{ timestamp }}",'
'"type":"{{#if fields.type}}{{ fields.type }}{{else}}test{{/if}}",'
'"hostname":"{{#if fields.hostname}}{{ fields.hostname }}{{else}}unknown{{/if}}"'
'}'
)
2026-02-21 20:33:22 +01:00
try:
2026-02-21 20:49:59 +01:00
# Set body template via pvesh (NOT in the config file -- PVE
# stores body base64-encoded internally, pvesh handles that).
# No header needed: our webhook handler parses JSON from raw
# body regardless of Content-Type.
r1 = subprocess.run(
['pvesh', 'set', pvesh_path, '--body', body_template],
2026-02-21 20:33:22 +01:00
capture_output=True, text=True, timeout=10
)
2026-02-21 20:49:59 +01:00
if r1.returncode != 0:
result['body_config_warning'] = f'pvesh set --body failed: {r1.stderr.strip()}'
2026-02-21 20:33:22 +01:00
except Exception as e:
2026-02-21 20:49:59 +01:00
# Non-fatal: if pvesh fails, webhook still receives POSTs (just empty body)
2026-02-21 20:33:22 +01:00
result['body_config_warning'] = str(e)
2026-02-19 17:02:02 +01:00
result['configured'] = True
2026-02-19 17:26:36 +01:00
result['secret'] = secret
2026-02-21 19:56:50 +01:00
return result
2026-02-19 17:02:02 +01:00
except Exception as e:
result['error'] = str(e)
2026-02-21 19:56:50 +01:00
result['fallback_commands'] = _build_webhook_fallback()
return result
2026-02-19 18:37:42 +01:00
2026-02-21 19:56:50 +01:00
@notification_bp.route('/api/notifications/proxmox/setup-webhook', methods=['POST'])
def setup_proxmox_webhook():
"""HTTP endpoint wrapper for webhook setup."""
return jsonify(setup_pve_webhook_core()), 200
def cleanup_pve_webhook_core() -> dict:
"""Core logic to remove PVE webhook blocks. Callable from anywhere.
2026-02-19 19:56:20 +01:00
2026-02-21 19:56:50 +01:00
Returns dict with 'cleaned', 'error' keys.
2026-02-19 19:56:20 +01:00
Only removes blocks named 'proxmenux-webhook' / 'proxmenux-default'.
"""
result = {'cleaned': False, 'error': None}
try:
# Read both files
cfg_text, err = _pve_read_file(_PVE_NOTIFICATIONS_CFG)
if err:
result['error'] = err
2026-02-21 19:56:50 +01:00
return result
2026-02-19 19:56:20 +01:00
priv_text, err = _pve_read_file(_PVE_PRIV_CFG)
if err:
priv_text = None
# Check if our blocks actually exist before doing anything
has_our_blocks = any(
h in cfg_text for h in [f'webhook: {_PVE_ENDPOINT_ID}', f'matcher: {_PVE_MATCHER_ID}']
)
has_priv_blocks = priv_text and f'webhook: {_PVE_ENDPOINT_ID}' in priv_text
if not has_our_blocks and not has_priv_blocks:
result['cleaned'] = True
2026-02-21 19:56:50 +01:00
return result
2026-02-19 19:56:20 +01:00
# Backup before modification
_pve_backup_file(_PVE_NOTIFICATIONS_CFG)
if priv_text is not None:
_pve_backup_file(_PVE_PRIV_CFG)
# Remove our blocks
if has_our_blocks:
cleaned_cfg = _pve_remove_our_blocks(cfg_text, _PVE_OUR_HEADERS)
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(cleaned_cfg)
except PermissionError:
result['error'] = f'Permission denied writing {_PVE_NOTIFICATIONS_CFG}'
2026-02-21 19:56:50 +01:00
return result
2026-02-19 19:56:20 +01:00
except Exception as e:
# Rollback
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(cfg_text)
except Exception:
pass
result['error'] = str(e)
2026-02-21 19:56:50 +01:00
return result
2026-02-19 19:56:20 +01:00
if has_priv_blocks and priv_text is not None:
cleaned_priv = _pve_remove_our_blocks(priv_text, _PVE_OUR_HEADERS)
try:
with open(_PVE_PRIV_CFG, 'w') as f:
f.write(cleaned_priv)
except Exception:
pass # Best-effort
result['cleaned'] = True
2026-02-21 19:56:50 +01:00
return result
2026-02-19 19:56:20 +01:00
except Exception as e:
result['error'] = str(e)
2026-02-21 19:56:50 +01:00
return result
@notification_bp.route('/api/notifications/proxmox/cleanup-webhook', methods=['POST'])
def cleanup_proxmox_webhook():
"""HTTP endpoint wrapper for webhook cleanup."""
return jsonify(cleanup_pve_webhook_core()), 200
2026-02-19 19:56:20 +01:00
2026-02-19 18:37:42 +01:00
@notification_bp.route('/api/notifications/proxmox/read-cfg', methods=['GET'])
def read_pve_notification_cfg():
"""Diagnostic: return raw content of PVE notification config files.
GET /api/notifications/proxmox/read-cfg
Returns both notifications.cfg and priv/notifications.cfg content.
"""
import os
files = {
'notifications_cfg': '/etc/pve/notifications.cfg',
'priv_cfg': '/etc/pve/priv/notifications.cfg',
}
# Also look for any backups we created
backup_dir = '/etc/pve'
priv_backup_dir = '/etc/pve/priv'
result = {}
for key, path in files.items():
try:
with open(path, 'r') as f:
result[key] = {
'path': path,
'content': f.read(),
'size': os.path.getsize(path),
'error': None,
}
except FileNotFoundError:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'file_not_found'}
except PermissionError:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'permission_denied'}
except Exception as e:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': str(e)}
# Find backups
backups = []
for d in [backup_dir, priv_backup_dir]:
2026-02-19 17:26:36 +01:00
try:
2026-02-19 18:37:42 +01:00
for fname in sorted(os.listdir(d)):
if 'proxmenux_backup' in fname:
fpath = os.path.join(d, fname)
try:
with open(fpath, 'r') as f:
backups.append({
'path': fpath,
'content': f.read(),
'size': os.path.getsize(fpath),
})
except Exception:
backups.append({'path': fpath, 'content': None, 'error': 'read_failed'})
2026-02-19 17:26:36 +01:00
except Exception:
2026-02-19 18:37:42 +01:00
pass
result['backups'] = backups
return jsonify(result), 200
@notification_bp.route('/api/notifications/proxmox/restore-cfg', methods=['POST'])
def restore_pve_notification_cfg():
"""Restore PVE notification config from our backup.
POST /api/notifications/proxmox/restore-cfg
Finds the most recent proxmenux_backup and restores it.
"""
import os
import shutil
files_to_restore = {
'/etc/pve': '/etc/pve/notifications.cfg',
'/etc/pve/priv': '/etc/pve/priv/notifications.cfg',
}
restored = []
errors = []
for search_dir, target_path in files_to_restore.items():
try:
candidates = sorted([
f for f in os.listdir(search_dir)
if 'proxmenux_backup' in f and f.startswith('notifications.cfg')
], reverse=True)
if candidates:
backup_path = os.path.join(search_dir, candidates[0])
shutil.copy2(backup_path, target_path)
restored.append({'target': target_path, 'from_backup': backup_path})
else:
errors.append({'target': target_path, 'error': 'no_backup_found'})
except Exception as e:
errors.append({'target': target_path, 'error': str(e)})
return jsonify({
'restored': restored,
'errors': errors,
'success': len(errors) == 0 and len(restored) > 0,
}), 200
2026-02-19 17:02:02 +01:00
@notification_bp.route('/api/notifications/webhook', methods=['POST'])
def proxmox_webhook():
"""Receive native Proxmox VE notification webhooks (hardened).
Security layers:
2026-02-19 18:37:42 +01:00
Localhost (127.0.0.1 / ::1): rate limiting only.
PVE calls us on localhost and cannot send custom auth headers,
so we trust the loopback interface (only local processes can reach it).
Remote: rate limiting + shared secret + timestamp + replay + IP allowlist.
2026-02-19 17:02:02 +01:00
"""
_reject = lambda code, error, status: (jsonify({'accepted': False, 'error': error}), status)
client_ip = request.remote_addr or ''
is_localhost = client_ip in ('127.0.0.1', '::1')
# ── Layer 1: Rate limiting (always) ──
if not _webhook_limiter.allow():
resp = jsonify({'accepted': False, 'error': 'rate_limited'})
resp.headers['Retry-After'] = '60'
return resp, 429
2026-02-19 18:37:42 +01:00
# ── Layers 2-5: Remote-only checks ──
2026-02-19 17:02:02 +01:00
if not is_localhost:
2026-02-19 18:37:42 +01:00
# Layer 2: Shared secret
try:
configured_secret = notification_manager.get_webhook_secret()
except Exception:
configured_secret = ''
if configured_secret:
request_secret = request.headers.get('X-Webhook-Secret', '')
if not request_secret:
return _reject(401, 'missing_secret', 401)
if not hmac.compare_digest(configured_secret, request_secret):
return _reject(401, 'invalid_secret', 401)
2026-02-19 17:02:02 +01:00
# Layer 3: Anti-replay timestamp
ts_header = request.headers.get('X-ProxMenux-Timestamp', '')
if not ts_header:
return _reject(401, 'missing_timestamp', 401)
try:
ts_value = int(ts_header)
except (ValueError, TypeError):
return _reject(401, 'invalid_timestamp', 401)
if abs(time.time() - ts_value) > _TIMESTAMP_MAX_DRIFT:
return _reject(401, 'timestamp_expired', 401)
# Layer 4: Replay cache
raw_body = request.get_data(as_text=True) or ''
signature = hashlib.sha256(f"{ts_value}:{raw_body}".encode(errors='replace')).hexdigest()
if _replay_cache.check_and_record(signature):
return _reject(409, 'replay_detected', 409)
# Layer 5: IP allowlist
try:
allowed_ips = notification_manager.get_webhook_allowed_ips()
if allowed_ips and client_ip not in allowed_ips:
return _reject(403, 'forbidden_ip', 403)
except Exception:
pass
# ── Parse and process payload ──
try:
2026-02-21 20:33:22 +01:00
content_type = request.content_type or ''
raw_data = request.get_data(as_text=True) or ''
# Try JSON first
2026-02-19 17:02:02 +01:00
payload = request.get_json(silent=True) or {}
2026-02-21 20:33:22 +01:00
# If not JSON, try form data
2026-02-19 17:02:02 +01:00
if not payload:
payload = dict(request.form)
2026-02-21 20:33:22 +01:00
# If still empty, try parsing raw data as JSON (PVE may not set Content-Type)
if not payload and raw_data:
try:
import json
payload = json.loads(raw_data)
except (json.JSONDecodeError, ValueError):
pass
# If still empty, create a minimal test event from raw data
2026-02-19 17:02:02 +01:00
if not payload:
2026-02-21 20:33:22 +01:00
if raw_data:
payload = {
'type': 'webhook_test',
'title': 'PVE Webhook Test',
'body': raw_data[:500],
'severity': 'info',
}
else:
return _reject(400, 'empty_payload', 400)
2026-02-19 17:02:02 +01:00
result = notification_manager.process_webhook(payload)
2026-02-21 20:33:22 +01:00
# Always return 200 to PVE -- a non-200 makes PVE report the webhook as broken.
# The 'accepted' field in the JSON body indicates actual processing status.
return jsonify(result), 200
except Exception as e:
# Still return 200 to avoid PVE flagging the webhook as broken
return jsonify({'accepted': False, 'error': 'internal_error', 'detail': str(e)}), 200