diff --git a/AppImage/components/notification-settings.tsx b/AppImage/components/notification-settings.tsx index 07e43881..acda234c 100644 --- a/AppImage/components/notification-settings.tsx +++ b/AppImage/components/notification-settings.tsx @@ -12,7 +12,7 @@ import { fetchApi } from "../lib/api-config" import { Bell, BellOff, Send, CheckCircle2, XCircle, Loader2, AlertTriangle, Info, Settings2, Zap, Eye, EyeOff, - Trash2, ChevronDown, ChevronUp, TestTube2 + Trash2, ChevronDown, ChevronUp, TestTube2, Mail, Webhook } from "lucide-react" interface ChannelConfig { @@ -22,6 +22,15 @@ interface ChannelConfig { url?: string token?: string webhook_url?: string + // Email channel fields + host?: string + port?: string + username?: string + password?: string + tls_mode?: string + from_address?: string + to_addresses?: string + subject_prefix?: string } interface NotificationConfig { @@ -34,6 +43,8 @@ interface NotificationConfig { ai_api_key: string ai_model: string hostname: string + webhook_secret: string + webhook_allowed_ips: string } interface ServiceStatus { @@ -84,6 +95,7 @@ const DEFAULT_CONFIG: NotificationConfig = { telegram: { enabled: false }, gotify: { enabled: false }, discord: { enabled: false }, + email: { enabled: false }, }, severity_filter: "warning", event_categories: { @@ -95,6 +107,8 @@ const DEFAULT_CONFIG: NotificationConfig = { ai_api_key: "", ai_model: "", hostname: "", + webhook_secret: "", + webhook_allowed_ips: "", } export function NotificationSettings() { @@ -112,6 +126,11 @@ export function NotificationSettings() { const [editMode, setEditMode] = useState(false) const [hasChanges, setHasChanges] = useState(false) const [originalConfig, setOriginalConfig] = useState(DEFAULT_CONFIG) + const [webhookSetup, setWebhookSetup] = useState<{ + status: "idle" | "running" | "success" | "failed" + fallback_commands: string[] + error: string + }>({ status: "idle", fallback_commands: [], error: "" }) const loadConfig = useCallback(async () => { try { @@ -252,6 +271,184 @@ export function NotificationSettings() { const activeChannels = Object.entries(config.channels).filter(([, ch]) => ch.enabled).length + const handleEnable = async () => { + setSaving(true) + setWebhookSetup({ status: "running", fallback_commands: [], error: "" }) + try { + // 1) Save enabled=true + const newConfig = { ...config, enabled: true } + await fetchApi("/api/notifications/settings", { + method: "POST", + body: JSON.stringify(newConfig), + }) + setConfig(newConfig) + setOriginalConfig(newConfig) + + // 2) Auto-configure PVE webhook + try { + const setup = await fetchApi<{ + configured: boolean + secret?: string + fallback_commands?: string[] + error?: string + }>("/api/notifications/proxmox/setup-webhook", { method: "POST" }) + + if (setup.configured) { + setWebhookSetup({ status: "success", fallback_commands: [], error: "" }) + // Update secret in local config if one was generated + if (setup.secret) { + const updated = { ...newConfig, webhook_secret: setup.secret } + setConfig(updated) + setOriginalConfig(updated) + } + } else { + setWebhookSetup({ + status: "failed", + fallback_commands: setup.fallback_commands || [], + error: setup.error || "Unknown error", + }) + } + } catch { + setWebhookSetup({ + status: "failed", + fallback_commands: [], + error: "Could not reach setup endpoint", + }) + } + + setEditMode(true) + loadStatus() + } catch (err) { + console.error("Failed to enable notifications:", err) + setWebhookSetup({ status: "idle", fallback_commands: [], error: "" }) + } finally { + setSaving(false) + } + } + + // ── Disabled state: show activation card ── + if (!config.enabled && !editMode) { + return ( + + +
+ + Notifications + + Disabled + +
+ + Get real-time alerts about your Proxmox environment via Telegram, Discord, Gotify, or Email. + +
+ +
+
+
+ +
+

Enable notification service

+

+ Monitor system health, VM/CT events, backups, security alerts, and cluster status. + PVE webhook integration is configured automatically. +

+
+
+
+ +
+ + {/* Webhook setup result */} + {webhookSetup.status === "success" && ( +
+ +

+ PVE webhook configured automatically. Proxmox will send notifications to ProxMenux. +

+
+ )} + {webhookSetup.status === "failed" && ( +
+
+ +
+

+ Automatic PVE configuration failed: {webhookSetup.error} +

+

+ Notifications are enabled. Run the commands below on the PVE host to complete webhook setup. +

+
+
+ {webhookSetup.fallback_commands.length > 0 && ( +
+{webhookSetup.fallback_commands.join('\n')}
+                    
+ )} +
+ )} +
+ + {/* PBS manual section (collapsible) */} +
+ + + + Configure PBS notifications (manual) + +
+
+

+ PVE backups launched from the PVE interface are covered automatically by the PVE webhook above. +

+

+ However, PBS has its own internal jobs (Verify, Prune, GC, Sync) that generate + separate notifications. These must be configured directly on the PBS server. +

+
+
+

+ Run on the PBS host: +

+
+{`# Create webhook endpoint on PBS
+proxmox-backup-manager notification endpoint webhook create proxmenux-webhook \\
+  --url http://:8008/api/notifications/webhook \\
+  --header "X-Webhook-Secret="
+
+# Create matcher to route PBS events
+proxmox-backup-manager notification matcher create proxmenux-pbs \\
+  --target proxmenux-webhook \\
+  --match-severity warning,error`}
+                  
+
+
+ +
+

+ {"Replace with the IP address of this PVE node (not 127.0.0.1, unless PBS runs on the same host)."} +

+

+ {"Replace with the webhook secret shown in your notification settings."} +

+
+
+
+
+
+
+
+ ) + } + return ( @@ -302,7 +499,7 @@ export function NotificationSettings() { - Configure notification channels and event filters. Receive alerts via Telegram, Gotify, or Discord. + Configure notification channels and event filters. Receive alerts via Telegram, Gotify, Discord, or Email. @@ -369,7 +566,7 @@ export function NotificationSettings() { - + Telegram @@ -379,6 +576,9 @@ export function NotificationSettings() { Discord + + Email + {/* Telegram */} @@ -571,6 +771,151 @@ export function NotificationSettings() { )} + + {/* Email */} + +
+ + +
+ {config.channels.email?.enabled && ( + <> +
+
+ + updateChannel("email", "host", e.target.value)} + disabled={!editMode} + /> +
+
+ + updateChannel("email", "port", e.target.value)} + disabled={!editMode} + /> +
+
+
+ + +
+
+
+ + updateChannel("email", "username", e.target.value)} + disabled={!editMode} + /> +
+
+ +
+ updateChannel("email", "password", e.target.value)} + disabled={!editMode} + /> + +
+
+
+
+ + updateChannel("email", "from_address", e.target.value)} + disabled={!editMode} + /> +
+
+ + updateChannel("email", "to_addresses", e.target.value)} + disabled={!editMode} + /> +
+
+ + updateChannel("email", "subject_prefix", e.target.value)} + disabled={!editMode} + /> +
+
+ +

+ Leave SMTP Host empty to use local sendmail (must be installed on the server). + For Gmail, use an App Password instead of your account password. +

+
+ {!editMode && config.channels.email?.to_addresses && ( + + )} + + )} +
{/* Test Result */} @@ -647,6 +992,131 @@ export function NotificationSettings() { + {/* ── Proxmox Webhook ── */} +
+
+
+ + Proxmox Webhook +
+ {!editMode && ( + + )} +
+ + {/* Setup status inline */} + {webhookSetup.status === "success" && ( +
+ +

PVE webhook configured successfully.

+
+ )} + {webhookSetup.status === "failed" && ( +
+
+ +

PVE auto-config failed: {webhookSetup.error}

+
+ {webhookSetup.fallback_commands.length > 0 && ( +
+{webhookSetup.fallback_commands.join('\n')}
+                    
+ )} +
+ )} + +
+ +
+ updateConfig(p => ({ ...p, webhook_secret: e.target.value }))} + disabled={!editMode} + /> + +
+

+ {"Proxmox must send this value in the X-Webhook-Secret header. Auto-generated on first enable."} +

+
+
+ + updateConfig(p => ({ ...p, webhook_allowed_ips: e.target.value }))} + disabled={!editMode} + /> +

+ {"Localhost (127.0.0.1) is always allowed. This restricts remote callers only."} +

+
+ + {/* PBS manual guide (collapsible) */} +
+ + + Configure PBS notifications (manual) + +
+

+ Backups launched from PVE are covered by the PVE webhook. PBS internal jobs + (Verify, Prune, GC, Sync) require separate configuration on the PBS server. +

+
+{`# On the PBS host:
+proxmox-backup-manager notification endpoint webhook \\
+  create proxmenux-webhook \\
+  --url http://:8008/api/notifications/webhook \\
+  --header "X-Webhook-Secret="
+
+proxmox-backup-manager notification matcher \\
+  create proxmenux-pbs \\
+  --target proxmenux-webhook \\
+  --match-severity warning,error`}
+                  
+

+ {"Replace with this node's IP and with the webhook secret above."} +

+
+
+
+ {/* ── Advanced: AI Enhancement ── */}
diff --git a/AppImage/scripts/flask_notification_routes.py b/AppImage/scripts/flask_notification_routes.py index 6f37e867..065bbf6e 100644 --- a/AppImage/scripts/flask_notification_routes.py +++ b/AppImage/scripts/flask_notification_routes.py @@ -3,9 +3,64 @@ Flask routes for notification service configuration and management. Blueprint pattern matching flask_health_routes.py / flask_security_routes.py. """ +import hmac +import time +import hashlib +from collections import deque from flask import Blueprint, jsonify, request from notification_manager import notification_manager + +# ─── Webhook Hardening Helpers ─────────────────────────────────── + +class WebhookRateLimiter: + """Simple sliding-window rate limiter for the webhook endpoint.""" + + def __init__(self, max_requests: int = 60, window_seconds: int = 60): + self._max = max_requests + self._window = window_seconds + self._timestamps: deque = deque() + + def allow(self) -> bool: + now = time.time() + # Prune entries outside the window + while self._timestamps and now - self._timestamps[0] > self._window: + self._timestamps.popleft() + if len(self._timestamps) >= self._max: + return False + self._timestamps.append(now) + return True + + +class ReplayCache: + """Bounded in-memory cache of recently seen request signatures (60s TTL).""" + + _MAX_SIZE = 2000 # Hard cap to prevent memory growth + + def __init__(self, ttl: int = 60): + self._ttl = ttl + self._seen: dict = {} # signature -> timestamp + + def check_and_record(self, signature: str) -> bool: + """Return True if this signature was already seen (replay). Records it otherwise.""" + now = time.time() + # Periodic cleanup + if len(self._seen) > self._MAX_SIZE // 2: + cutoff = now - self._ttl + self._seen = {k: v for k, v in self._seen.items() if v > cutoff} + if signature in self._seen and now - self._seen[signature] < self._ttl: + return True # Replay detected + self._seen[signature] = now + return False + + +# Module-level singletons (one per process) +_webhook_limiter = WebhookRateLimiter(max_requests=60, window_seconds=60) +_replay_cache = ReplayCache(ttl=60) + +# Timestamp validation window (seconds) +_TIMESTAMP_MAX_DRIFT = 60 + notification_bp = Blueprint('notifications', __name__) @@ -100,3 +155,218 @@ def send_notification(): return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/proxmox/setup-webhook', methods=['POST']) +def setup_proxmox_webhook(): + """Automatically configure PVE notifications to call our webhook. + + Idempotent: safe to call multiple times. Only creates/updates + ProxMenux-owned objects (proxmenux-webhook endpoint, proxmenux-default matcher). + Never deletes or overrides user notification targets. + """ + import subprocess + import secrets as secrets_mod + + ENDPOINT_ID = 'proxmenux-webhook' + MATCHER_ID = 'proxmenux-default' + WEBHOOK_URL = 'http://127.0.0.1:8008/api/notifications/webhook' + + result = { + 'configured': False, + 'endpoint_id': ENDPOINT_ID, + 'matcher_id': MATCHER_ID, + 'url': WEBHOOK_URL, + 'fallback_commands': [], + 'error': None, + } + + def _run_pvesh(args: list, check: bool = True) -> tuple: + """Run pvesh command. Returns (success, stdout, stderr).""" + try: + proc = subprocess.run( + ['pvesh'] + args, + capture_output=True, text=True, timeout=15 + ) + return proc.returncode == 0, proc.stdout.strip(), proc.stderr.strip() + except FileNotFoundError: + return False, '', 'pvesh not found' + except subprocess.TimeoutExpired: + return False, '', 'pvesh timed out' + except Exception as e: + return False, '', str(e) + + try: + # Step 1: Ensure webhook secret exists + secret = notification_manager.get_webhook_secret() + if not secret: + secret = secrets_mod.token_urlsafe(32) + notification_manager._save_setting('webhook_secret', secret) + + secret_header = f'X-Webhook-Secret={secret}' + + # Step 2: Check if endpoint already exists + exists_ok, _, _ = _run_pvesh([ + 'get', f'/cluster/notifications/endpoints/webhook/{ENDPOINT_ID}', + '--output-format', 'json' + ]) + + if exists_ok: + # Update existing endpoint + ok, _, err = _run_pvesh([ + 'set', f'/cluster/notifications/endpoints/webhook/{ENDPOINT_ID}', + '--url', WEBHOOK_URL, + '--method', 'post', + '--header', secret_header, + ]) + else: + # Create new endpoint + ok, _, err = _run_pvesh([ + 'create', '/cluster/notifications/endpoints/webhook', + '--name', ENDPOINT_ID, + '--url', WEBHOOK_URL, + '--method', 'post', + '--header', secret_header, + ]) + + if not ok: + # Build fallback commands for manual execution + result['fallback_commands'] = [ + f'pvesh create /cluster/notifications/endpoints/webhook ' + f'--name {ENDPOINT_ID} --url {WEBHOOK_URL} --method post ' + f'--header "{secret_header}"', + f'pvesh create /cluster/notifications/matchers ' + f'--name {MATCHER_ID} --target {ENDPOINT_ID} ' + f'--match-severity warning,error', + ] + result['error'] = f'Failed to configure endpoint: {err}' + return jsonify(result), 200 + + # Step 3: Create or update matcher + matcher_exists, _, _ = _run_pvesh([ + 'get', f'/cluster/notifications/matchers/{MATCHER_ID}', + '--output-format', 'json' + ]) + + if matcher_exists: + ok_m, _, err_m = _run_pvesh([ + 'set', f'/cluster/notifications/matchers/{MATCHER_ID}', + '--target', ENDPOINT_ID, + '--match-severity', 'warning,error', + ]) + else: + ok_m, _, err_m = _run_pvesh([ + 'create', '/cluster/notifications/matchers', + '--name', MATCHER_ID, + '--target', ENDPOINT_ID, + '--match-severity', 'warning,error', + ]) + + if not ok_m: + result['fallback_commands'] = [ + f'pvesh create /cluster/notifications/matchers ' + f'--name {MATCHER_ID} --target {ENDPOINT_ID} ' + f'--match-severity warning,error', + ] + result['error'] = f'Endpoint OK, but matcher failed: {err_m}' + result['configured'] = False + return jsonify(result), 200 + + result['configured'] = True + result['secret'] = secret # Return so UI can display it + return jsonify(result), 200 + + except Exception as e: + result['error'] = str(e) + result['fallback_commands'] = [ + f'pvesh create /cluster/notifications/endpoints/webhook ' + f'--name {ENDPOINT_ID} --url {WEBHOOK_URL} --method post ' + f'--header "X-Webhook-Secret=YOUR_SECRET"', + f'pvesh create /cluster/notifications/matchers ' + f'--name {MATCHER_ID} --target {ENDPOINT_ID} ' + f'--match-severity warning,error', + ] + return jsonify(result), 200 + + +@notification_bp.route('/api/notifications/webhook', methods=['POST']) +def proxmox_webhook(): + """Receive native Proxmox VE notification webhooks (hardened). + + Security layers: + 1. Rate limiting (60 req/min) -- always + 2. Shared secret (X-Webhook-Secret) -- always required + 3. Anti-replay timestamp (60s window) -- remote only + 4. Replay cache (signature dedup) -- remote only + 5. IP allowlist (optional) -- remote only + + Localhost callers (127.0.0.1 / ::1) bypass layers 3-5 because Proxmox + cannot inject dynamic timestamp headers. The shared secret is still + required for localhost to prevent any local process from injecting events. + """ + _reject = lambda code, error, status: (jsonify({'accepted': False, 'error': error}), status) + + client_ip = request.remote_addr or '' + is_localhost = client_ip in ('127.0.0.1', '::1') + + # ── Layer 1: Rate limiting (always) ── + if not _webhook_limiter.allow(): + resp = jsonify({'accepted': False, 'error': 'rate_limited'}) + resp.headers['Retry-After'] = '60' + return resp, 429 + + # ── Layer 2: Shared secret (always required) ── + try: + configured_secret = notification_manager.get_webhook_secret() + except Exception: + configured_secret = '' + + if not configured_secret: + return _reject(500, 'webhook_not_configured', 500) + + request_secret = request.headers.get('X-Webhook-Secret', '') + if not request_secret: + return _reject(401, 'missing_secret', 401) + if not hmac.compare_digest(configured_secret, request_secret): + return _reject(401, 'invalid_secret', 401) + + # ── Layers 3-5: Remote-only checks ── + if not is_localhost: + # Layer 3: Anti-replay timestamp + ts_header = request.headers.get('X-ProxMenux-Timestamp', '') + if not ts_header: + return _reject(401, 'missing_timestamp', 401) + try: + ts_value = int(ts_header) + except (ValueError, TypeError): + return _reject(401, 'invalid_timestamp', 401) + if abs(time.time() - ts_value) > _TIMESTAMP_MAX_DRIFT: + return _reject(401, 'timestamp_expired', 401) + + # Layer 4: Replay cache + raw_body = request.get_data(as_text=True) or '' + signature = hashlib.sha256(f"{ts_value}:{raw_body}".encode(errors='replace')).hexdigest() + if _replay_cache.check_and_record(signature): + return _reject(409, 'replay_detected', 409) + + # Layer 5: IP allowlist + try: + allowed_ips = notification_manager.get_webhook_allowed_ips() + if allowed_ips and client_ip not in allowed_ips: + return _reject(403, 'forbidden_ip', 403) + except Exception: + pass + + # ── Parse and process payload ── + try: + payload = request.get_json(silent=True) or {} + if not payload: + payload = dict(request.form) + if not payload: + return _reject(400, 'invalid_payload', 400) + + result = notification_manager.process_webhook(payload) + status_code = 200 if result.get('accepted') else 400 + return jsonify(result), status_code + except Exception: + return jsonify({'accepted': False, 'error': 'internal_error'}), 500 diff --git a/AppImage/scripts/health_persistence.py b/AppImage/scripts/health_persistence.py index 1bf3a78f..d91b98c9 100644 --- a/AppImage/scripts/health_persistence.py +++ b/AppImage/scripts/health_persistence.py @@ -130,6 +130,15 @@ class HealthPersistence: ) ''') + # Notification cooldown persistence (survives restarts) + cursor.execute(''' + CREATE TABLE IF NOT EXISTS notification_last_sent ( + fingerprint TEXT PRIMARY KEY, + last_sent_ts INTEGER NOT NULL, + count INTEGER DEFAULT 1 + ) + ''') + # Migration: add suppression_hours column to errors if not present cursor.execute("PRAGMA table_info(errors)") columns = [col[1] for col in cursor.fetchall()] @@ -143,6 +152,7 @@ class HealthPersistence: cursor.execute('CREATE INDEX IF NOT EXISTS idx_events_error ON events(error_key)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_sent_at ON notification_history(sent_at)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_severity ON notification_history(severity)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_nls_ts ON notification_last_sent(last_sent_ts)') conn.commit() conn.close() diff --git a/AppImage/scripts/notification_channels.py b/AppImage/scripts/notification_channels.py index f1c2e1f2..79393e50 100644 --- a/AppImage/scripts/notification_channels.py +++ b/AppImage/scripts/notification_channels.py @@ -311,7 +311,14 @@ class DiscordChannel(NotificationChannel): 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), } - if data: + # Use structured fields from render_template if available + rendered_fields = (data or {}).get('_rendered_fields', []) + if rendered_fields: + embed['fields'] = [ + {'name': name, 'value': val[:1024], 'inline': True} + for name, val in rendered_fields[:25] # Discord limit: 25 fields + ] + elif data: fields = [] if data.get('category'): fields.append({'name': 'Category', 'value': data['category'], 'inline': True}) @@ -351,6 +358,164 @@ class DiscordChannel(NotificationChannel): ) +# ─── Email Channel ────────────────────────────────────────────── + +class EmailChannel(NotificationChannel): + """Email notification channel using SMTP (smtplib) or sendmail fallback. + + Config keys: + host, port, username, password, tls_mode (none|starttls|ssl), + from_address, to_addresses (comma-separated), subject_prefix, timeout + """ + + def __init__(self, config: Dict[str, str]): + super().__init__() + self.host = config.get('host', '') + self.port = int(config.get('port', 587) or 587) + self.username = config.get('username', '') + self.password = config.get('password', '') + self.tls_mode = config.get('tls_mode', 'starttls') # none | starttls | ssl + self.from_address = config.get('from_address', '') + self.to_addresses = self._parse_recipients(config.get('to_addresses', '')) + self.subject_prefix = config.get('subject_prefix', '[ProxMenux]') + self.timeout = int(config.get('timeout', 10) or 10) + + @staticmethod + def _parse_recipients(raw) -> list: + if isinstance(raw, list): + return [a.strip() for a in raw if a.strip()] + return [addr.strip() for addr in str(raw).split(',') if addr.strip()] + + def validate_config(self) -> Tuple[bool, str]: + if not self.to_addresses: + return False, 'No recipients configured' + if not self.from_address: + return False, 'No from address configured' + # Must have SMTP host OR local sendmail available + if not self.host: + import os + if not os.path.exists('/usr/sbin/sendmail'): + return False, 'No SMTP host configured and /usr/sbin/sendmail not found' + return True, '' + + def send(self, title: str, message: str, severity: str = 'INFO', + data: Optional[Dict] = None) -> Dict[str, Any]: + subject = f"{self.subject_prefix} [{severity}] {title}" + + def _do_send(): + if self.host: + return self._send_smtp(subject, message, severity) + else: + return self._send_sendmail(subject, message, severity) + + return self._send_with_retry(_do_send) + + def _send_smtp(self, subject: str, body: str, severity: str) -> Tuple[int, str]: + import smtplib + from email.message import EmailMessage + + msg = EmailMessage() + msg['Subject'] = subject + msg['From'] = self.from_address + msg['To'] = ', '.join(self.to_addresses) + msg.set_content(body) + + # Add HTML alternative + html_body = self._format_html(subject, body, severity) + if html_body: + msg.add_alternative(html_body, subtype='html') + + try: + if self.tls_mode == 'ssl': + server = smtplib.SMTP_SSL(self.host, self.port, timeout=self.timeout) + else: + server = smtplib.SMTP(self.host, self.port, timeout=self.timeout) + if self.tls_mode == 'starttls': + server.starttls() + + if self.username and self.password: + server.login(self.username, self.password) + + server.send_message(msg) + server.quit() + return 200, 'OK' + except smtplib.SMTPAuthenticationError as e: + return 0, f'SMTP authentication failed: {e}' + except smtplib.SMTPConnectError as e: + return 0, f'SMTP connection failed: {e}' + except smtplib.SMTPException as e: + return 0, f'SMTP error: {e}' + except (OSError, TimeoutError) as e: + return 0, f'Connection error: {e}' + + def _send_sendmail(self, subject: str, body: str, severity: str) -> Tuple[int, str]: + import os + import subprocess + from email.message import EmailMessage + + sendmail = '/usr/sbin/sendmail' + if not os.path.exists(sendmail): + return 0, 'sendmail not found at /usr/sbin/sendmail' + + msg = EmailMessage() + msg['Subject'] = subject + msg['From'] = self.from_address or 'proxmenux@localhost' + msg['To'] = ', '.join(self.to_addresses) + msg.set_content(body) + + try: + proc = subprocess.run( + [sendmail, '-t', '-oi'], + input=msg.as_string(), capture_output=True, text=True, timeout=30 + ) + if proc.returncode == 0: + return 200, 'OK' + return 0, f'sendmail failed (rc={proc.returncode}): {proc.stderr[:200]}' + except subprocess.TimeoutExpired: + return 0, 'sendmail timed out after 30s' + except Exception as e: + return 0, f'sendmail error: {e}' + + @staticmethod + def _format_html(subject: str, body: str, severity: str) -> str: + """Create professional HTML email.""" + import html as html_mod + + severity_colors = {'CRITICAL': '#dc2626', 'WARNING': '#f59e0b', 'INFO': '#3b82f6'} + color = severity_colors.get(severity, '#6b7280') + + body_html = ''.join( + f'

{html_mod.escape(line)}

' + for line in body.split('\n') if line.strip() + ) + + return f''' + +
+
+

ProxMenux Monitor

+

{html_mod.escape(severity)} Alert

+
+
+

{html_mod.escape(subject)}

+ {body_html} +
+
+

Sent by ProxMenux Notification Service

+
+
+''' + + def test(self) -> Tuple[bool, str]: + result = self.send( + 'ProxMenux Test Notification', + 'This is a test notification from ProxMenux Monitor.\n' + 'If you received this, your email channel is working correctly.', + 'INFO' + ) + return result.get('success', False), result.get('error', '') + + # ─── Channel Factory ───────────────────────────────────────────── CHANNEL_TYPES = { @@ -369,6 +534,12 @@ CHANNEL_TYPES = { 'config_keys': ['webhook_url'], 'class': DiscordChannel, }, + 'email': { + 'name': 'Email (SMTP)', + 'config_keys': ['host', 'port', 'username', 'password', 'tls_mode', + 'from_address', 'to_addresses', 'subject_prefix'], + 'class': EmailChannel, + }, } @@ -397,6 +568,8 @@ def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[Notifi return DiscordChannel( webhook_url=config.get('webhook_url', '') ) + elif channel_type == 'email': + return EmailChannel(config) except Exception as e: print(f"[NotificationChannels] Failed to create {channel_type}: {e}") return None diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 0487344b..3d9a9964 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -16,32 +16,70 @@ import os import re import json import time +import hashlib import socket import subprocess import threading from queue import Queue -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Tuple from pathlib import Path # ─── Event Object ───────────────────────────────────────────────── class NotificationEvent: - """Represents a detected event ready for notification dispatch.""" + """Represents a detected event ready for notification dispatch. - __slots__ = ('event_type', 'severity', 'data', 'timestamp', 'source') + Fields: + event_type: Taxonomy key (e.g. 'vm_fail', 'auth_fail', 'split_brain') + severity: INFO | WARNING | CRITICAL + data: Payload dict with context (hostname, vmid, reason, etc.) + source: Origin: journal | tasks | health | proxmox_hook | cli | api | polling + entity: What is affected: node | vm | ct | storage | disk | network | cluster | user + entity_id: Specific identifier (vmid, IP, device, pool, interface, etc.) + raw: Original payload (webhook JSON or log line), optional + fingerprint: Stable dedup key: hostname:entity:entity_id:event_type + event_id: Short hash of fingerprint for correlation + ts_epoch: time.time() at creation + ts_monotonic: time.monotonic() at creation (drift-safe for cooldown) + """ + + __slots__ = ( + 'event_type', 'severity', 'data', 'timestamp', 'source', + 'entity', 'entity_id', 'raw', + 'fingerprint', 'event_id', 'ts_epoch', 'ts_monotonic', + ) def __init__(self, event_type: str, severity: str = 'INFO', data: Optional[Dict[str, Any]] = None, - source: str = 'watcher'): + source: str = 'watcher', + entity: str = 'node', entity_id: str = '', + raw: Any = None): self.event_type = event_type self.severity = severity self.data = data or {} - self.timestamp = time.time() self.source = source + self.entity = entity + self.entity_id = entity_id + self.raw = raw + self.ts_epoch = time.time() + self.ts_monotonic = time.monotonic() + self.timestamp = self.ts_epoch # backward compat + + # Build fingerprint for dedup/cooldown + hostname = self.data.get('hostname', _hostname()) + if entity_id: + fp_base = f"{hostname}:{entity}:{entity_id}:{event_type}" + else: + # When entity_id is empty, include a hash of title/body for uniqueness + reason = self.data.get('reason', self.data.get('title', '')) + stable_extra = hashlib.md5(reason.encode(errors='replace')).hexdigest()[:8] if reason else '' + fp_base = f"{hostname}:{entity}:{event_type}:{stable_extra}" + self.fingerprint = fp_base + self.event_id = hashlib.md5(fp_base.encode()).hexdigest()[:12] def __repr__(self): - return f"NotificationEvent({self.event_type}, {self.severity})" + return f"NotificationEvent({self.event_type}, {self.severity}, fp={self.fingerprint[:40]})" def _hostname() -> str: @@ -186,7 +224,7 @@ class JournalWatcher: 'username': username, 'service': service, 'hostname': self._hostname, - }) + }, entity='user', entity_id=source_ip) return def _check_fail2ban(self, msg: str, syslog_id: str): @@ -206,7 +244,7 @@ class JournalWatcher: 'jail': jail, 'failures': '', 'hostname': self._hostname, - }) + }, entity='user', entity_id=ip) def _check_kernel_critical(self, msg: str, syslog_id: str, priority: int): """Detect kernel panics, OOM, segfaults, hardware errors.""" @@ -227,13 +265,17 @@ class JournalWatcher: for pattern, (event_type, severity, reason) in critical_patterns.items(): if re.search(pattern, msg, re.IGNORECASE): data = {'reason': reason, 'hostname': self._hostname} + entity = 'node' + entity_id = '' # Try to extract device for disk errors dev_match = re.search(r'dev\s+(\S+)', msg) if dev_match and event_type == 'disk_io_error': data['device'] = dev_match.group(1) + entity = 'disk' + entity_id = dev_match.group(1) - self._emit(event_type, severity, data) + self._emit(event_type, severity, data, entity=entity, entity_id=entity_id) return def _check_service_failure(self, msg: str, unit: str): @@ -252,7 +294,7 @@ class JournalWatcher: 'service_name': service_name, 'reason': msg[:200], 'hostname': self._hostname, - }) + }, entity='node', entity_id=service_name) return def _check_disk_io(self, msg: str, syslog_id: str, priority: int): @@ -275,7 +317,7 @@ class JournalWatcher: 'device': device, 'reason': msg[:200], 'hostname': self._hostname, - }) + }, entity='disk', entity_id=device) return def _check_cluster_events(self, msg: str, syslog_id: str): @@ -293,7 +335,7 @@ class JournalWatcher: 'quorum': quorum, 'reason': msg[:200], 'hostname': self._hostname, - }) + }, entity='cluster', entity_id=self._hostname) return # Node disconnect @@ -306,7 +348,7 @@ class JournalWatcher: self._emit('node_disconnect', 'CRITICAL', { 'node_name': node_name, 'hostname': self._hostname, - }) + }, entity='cluster', entity_id=node_name) def _check_system_shutdown(self, msg: str, syslog_id: str): """Detect system shutdown/reboot.""" @@ -315,13 +357,13 @@ class JournalWatcher: self._emit('system_shutdown', 'WARNING', { 'reason': 'System journal stopped', 'hostname': self._hostname, - }) + }, entity='node', entity_id='') elif 'Shutting down' in msg or 'System is rebooting' in msg: event = 'system_reboot' if 'reboot' in msg.lower() else 'system_shutdown' self._emit(event, 'WARNING', { 'reason': msg[:200], 'hostname': self._hostname, - }) + }, entity='node', entity_id='') def _check_permission_change(self, msg: str, syslog_id: str): """Detect user permission changes in PVE.""" @@ -341,7 +383,7 @@ class JournalWatcher: 'username': username, 'change_details': action, 'hostname': self._hostname, - }) + }, entity='user', entity_id=username) return def _check_firewall(self, msg: str, syslog_id: str): @@ -350,20 +392,24 @@ class JournalWatcher: self._emit('firewall_issue', 'WARNING', { 'reason': msg[:200], 'hostname': self._hostname, - }) + }, entity='network', entity_id='') # ── Emit helper ── - def _emit(self, event_type: str, severity: str, data: Dict): - """Emit event to queue with deduplication.""" - dedup_key = f"{event_type}:{data.get('source_ip', '')}:{data.get('device', '')}:{data.get('service_name', '')}" + def _emit(self, event_type: str, severity: str, data: Dict, + entity: str = 'node', entity_id: str = ''): + """Emit event to queue with short-term deduplication (30s window).""" + event = NotificationEvent( + event_type, severity, data, source='journal', + entity=entity, entity_id=entity_id, + ) now = time.time() - last = self._recent_events.get(dedup_key, 0) + last = self._recent_events.get(event.fingerprint, 0) if now - last < self._dedup_window: - return # Skip duplicate + return # Skip duplicate within 30s window - self._recent_events[dedup_key] = now + self._recent_events[event.fingerprint] = now # Cleanup old dedup entries periodically if len(self._recent_events) > 200: @@ -372,7 +418,7 @@ class JournalWatcher: k: v for k, v in self._recent_events.items() if v > cutoff } - self._queue.put(NotificationEvent(event_type, severity, data, source='journal')) + self._queue.put(event) # ─── Task Watcher (Real-time) ──────────────────────────────────── @@ -522,7 +568,12 @@ class TaskWatcher: 'snapshot_name': '', } - self._queue.put(NotificationEvent(event_type, severity, data, source='task')) + # Determine entity type from task type + entity = 'ct' if task_type.startswith('vz') else 'vm' + self._queue.put(NotificationEvent( + event_type, severity, data, source='tasks', + entity=entity, entity_id=vmid, + )) def _get_vm_name(self, vmid: str) -> str: """Try to resolve VMID to name via config files.""" @@ -628,8 +679,18 @@ class PollingCollector: data['hostname'] = self._hostname data['error_key'] = evt.get('error_key', '') + # Deduce entity from health category + category = data.get('category', '') + entity_map = { + 'cpu': ('node', ''), 'memory': ('node', ''), + 'disk': ('storage', ''), 'network': ('network', ''), + 'pve_services': ('node', ''), 'security': ('user', ''), + 'updates': ('node', ''), 'storage': ('storage', ''), + } + entity, eid = entity_map.get(category, ('node', '')) self._queue.put(NotificationEvent( - event_type, severity, data, source='health_monitor' + event_type, severity, data, source='health', + entity=entity, entity_id=eid or data.get('error_key', ''), )) # Mark events as notified @@ -641,14 +702,18 @@ class PollingCollector: # Also check unnotified errors unnotified = health_persistence.get_unnotified_errors() for error in unnotified: + err_cat = error.get('category', '') + e_entity, e_eid = entity_map.get(err_cat, ('node', '')) self._queue.put(NotificationEvent( 'new_error', error.get('severity', 'WARNING'), { - 'category': error.get('category', ''), + 'category': err_cat, 'reason': error.get('reason', ''), 'hostname': self._hostname, 'error_key': error.get('error_key', ''), }, - source='health_monitor' + source='health', + entity=e_entity, + entity_id=e_eid or error.get('error_key', ''), )) # Mark as notified if 'id' in error: @@ -692,7 +757,139 @@ class PollingCollector: 'details': details, 'hostname': self._hostname, }, - source='polling' + source='polling', + entity='node', entity_id='', )) except Exception: pass # Non-critical, silently skip + + +# ─── Proxmox Webhook Receiver ─────────────────────────────────── + +class ProxmoxHookWatcher: + """Receives native Proxmox VE notifications via local webhook endpoint. + + Proxmox can be configured to send notifications to a webhook target: + pvesh create /cluster/notifications/endpoints/webhook/proxmenux \\ + --url http://127.0.0.1:8008/api/notifications/webhook \\ + --method POST + + Payload varies by source (storage, replication, cluster, PBS, apt). + This class normalizes them into NotificationEvent objects. + """ + + def __init__(self, event_queue: Queue): + self._queue = event_queue + self._hostname = _hostname() + + def process_webhook(self, payload: dict) -> dict: + """Process an incoming Proxmox webhook payload. + + Returns: {'accepted': bool, 'event_type': str, 'event_id': str} + or {'accepted': False, 'error': str} + """ + if not payload: + return {'accepted': False, 'error': 'Empty payload'} + + # Extract common fields from PVE notification payload + notification_type = payload.get('type', payload.get('notification-type', '')) + severity_raw = payload.get('severity', payload.get('priority', 'info')) + title = payload.get('title', payload.get('subject', '')) + body = payload.get('body', payload.get('message', '')) + source_component = payload.get('component', payload.get('source', '')) + + # Map to our event taxonomy + event_type, entity, entity_id = self._classify( + notification_type, source_component, title, body, payload + ) + severity = self._map_severity(severity_raw) + + data = { + 'hostname': self._hostname, + 'reason': body[:500] if body else title, + 'title': title, + 'source_component': source_component, + 'notification_type': notification_type, + } + # Merge extra fields from payload + for key in ('vmid', 'node', 'storage', 'device', 'pool'): + if key in payload: + data[key] = str(payload[key]) + + event = NotificationEvent( + event_type=event_type, + severity=severity, + data=data, + source='proxmox_hook', + entity=entity, + entity_id=entity_id, + raw=payload, + ) + + self._queue.put(event) + return {'accepted': True, 'event_type': event_type, 'event_id': event.event_id} + + def _classify(self, ntype: str, component: str, title: str, + body: str, payload: dict) -> tuple: + """Classify webhook payload into (event_type, entity, entity_id).""" + title_lower = (title or '').lower() + body_lower = (body or '').lower() + component_lower = (component or '').lower() + + # Storage / SMART / ZFS / Ceph + if any(k in component_lower for k in ('smart', 'disk', 'zfs', 'ceph')): + entity_id = payload.get('device', payload.get('pool', '')) + if 'smart' in title_lower or 'smart' in body_lower: + return 'disk_io_error', 'disk', str(entity_id) + if 'zfs' in title_lower: + return 'disk_io_error', 'storage', str(entity_id) + return 'disk_space_low', 'storage', str(entity_id) + + # Replication + if 'replication' in component_lower or 'replication' in title_lower: + vmid = str(payload.get('vmid', '')) + if 'fail' in title_lower or 'error' in body_lower: + return 'vm_fail', 'vm', vmid + return 'migration_complete', 'vm', vmid + + # PBS (Proxmox Backup Server) + if 'pbs' in component_lower or 'backup' in component_lower: + vmid = str(payload.get('vmid', '')) + if 'fail' in title_lower or 'error' in body_lower: + return 'backup_fail', 'vm', vmid + if 'complete' in title_lower or 'success' in body_lower: + return 'backup_complete', 'vm', vmid + return 'backup_start', 'vm', vmid + + # Cluster / HA / Fencing / Corosync + if any(k in component_lower for k in ('cluster', 'ha', 'fencing', 'corosync')): + node = str(payload.get('node', '')) + if 'quorum' in title_lower or 'split' in body_lower: + return 'split_brain', 'cluster', node + if 'fencing' in title_lower: + return 'node_disconnect', 'cluster', node + return 'node_disconnect', 'cluster', node + + # APT / Updates + if 'apt' in component_lower or 'update' in title_lower: + return 'update_available', 'node', '' + + # Network + if 'network' in component_lower: + return 'network_down', 'network', '' + + # Security + if any(k in component_lower for k in ('auth', 'firewall', 'security')): + return 'auth_fail', 'user', '' + + # Fallback: system_problem generic + return 'system_problem', 'node', '' + + @staticmethod + def _map_severity(raw: str) -> str: + raw_l = str(raw).lower() + if raw_l in ('critical', 'emergency', 'alert', 'crit', 'err', 'error'): + return 'CRITICAL' + if raw_l in ('warning', 'warn'): + return 'WARNING' + return 'INFO' diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index 19704cb1..e8b5335c 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -39,7 +39,8 @@ from notification_templates import ( EVENT_GROUPS, get_event_types_by_group, get_default_enabled_events ) from notification_events import ( - JournalWatcher, TaskWatcher, PollingCollector, NotificationEvent + JournalWatcher, TaskWatcher, PollingCollector, NotificationEvent, + ProxmoxHookWatcher, ) @@ -50,7 +51,7 @@ SETTINGS_PREFIX = 'notification.' # Cooldown defaults (seconds) DEFAULT_COOLDOWNS = { - 'CRITICAL': 0, # No cooldown for critical + 'CRITICAL': 60, # 60s minimum (prevents storm, delivers fast) 'WARNING': 300, # 5 min 'INFO': 900, # 15 min 'resources': 900, # 15 min for resource alerts @@ -58,6 +59,191 @@ DEFAULT_COOLDOWNS = { } +# ─── Storm Protection ──────────────────────────────────────────── + +GROUP_RATE_LIMITS = { + 'security': {'max_per_minute': 5, 'max_per_hour': 30}, + 'storage': {'max_per_minute': 3, 'max_per_hour': 20}, + 'cluster': {'max_per_minute': 5, 'max_per_hour': 20}, + 'network': {'max_per_minute': 3, 'max_per_hour': 15}, + 'resources': {'max_per_minute': 3, 'max_per_hour': 20}, + 'vm_ct': {'max_per_minute': 10, 'max_per_hour': 60}, + 'backup': {'max_per_minute': 5, 'max_per_hour': 30}, + 'system': {'max_per_minute': 5, 'max_per_hour': 30}, +} + + +class GroupRateLimiter: + """Rate limiter per event group. Prevents notification storms.""" + + def __init__(self): + from collections import deque + self._deque = deque + self._minute_counts: Dict[str, Any] = {} # group -> deque[timestamp] + self._hour_counts: Dict[str, Any] = {} # group -> deque[timestamp] + + def allow(self, group: str) -> bool: + """Check if group rate limit allows this event.""" + limits = GROUP_RATE_LIMITS.get(group, GROUP_RATE_LIMITS['system']) + now = time.time() + + # Initialize if needed + if group not in self._minute_counts: + self._minute_counts[group] = self._deque() + self._hour_counts[group] = self._deque() + + # Prune old entries + minute_q = self._minute_counts[group] + hour_q = self._hour_counts[group] + while minute_q and now - minute_q[0] > 60: + minute_q.popleft() + while hour_q and now - hour_q[0] > 3600: + hour_q.popleft() + + # Check limits + if len(minute_q) >= limits['max_per_minute']: + return False + if len(hour_q) >= limits['max_per_hour']: + return False + + # Record + minute_q.append(now) + hour_q.append(now) + return True + + def get_stats(self) -> Dict[str, Dict[str, int]]: + """Return current rate stats per group.""" + now = time.time() + stats = {} + for group in self._minute_counts: + minute_q = self._minute_counts.get(group, []) + hour_q = self._hour_counts.get(group, []) + stats[group] = { + 'last_minute': sum(1 for t in minute_q if now - t <= 60), + 'last_hour': sum(1 for t in hour_q if now - t <= 3600), + } + return stats + + +AGGREGATION_RULES = { + 'auth_fail': {'window': 120, 'min_count': 3, 'burst_type': 'burst_auth_fail'}, + 'ip_block': {'window': 120, 'min_count': 3, 'burst_type': 'burst_ip_block'}, + 'disk_io_error': {'window': 60, 'min_count': 3, 'burst_type': 'burst_disk_io'}, + 'split_brain': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'}, + 'node_disconnect': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'}, +} + + +class BurstAggregator: + """Accumulates similar events in a time window, then sends a single summary. + + Examples: + - "Fail2Ban banned 17 IPs in 2 minutes" + - "Disk I/O errors: 34 events on /dev/sdb in 60s" + """ + + def __init__(self): + self._buckets: Dict[str, List] = {} # bucket_key -> [events] + self._deadlines: Dict[str, float] = {} # bucket_key -> flush_deadline + self._lock = threading.Lock() + + def ingest(self, event: NotificationEvent) -> Optional[NotificationEvent]: + """Add event to aggregation. Returns: + - None if event is being buffered (wait for window) + - Original event if not eligible for aggregation + """ + rule = AGGREGATION_RULES.get(event.event_type) + if not rule: + return event # Not aggregable, pass through + + bucket_key = f"{event.event_type}:{event.data.get('hostname', '')}" + + with self._lock: + if bucket_key not in self._buckets: + self._buckets[bucket_key] = [] + self._deadlines[bucket_key] = time.time() + rule['window'] + + self._buckets[bucket_key].append(event) + + # First event in bucket: pass through immediately so user gets fast alert + if len(self._buckets[bucket_key]) == 1: + return event + + # Subsequent events: buffer (will be flushed as summary) + return None + + def flush_expired(self) -> List[NotificationEvent]: + """Flush all buckets past their deadline. Returns summary events.""" + now = time.time() + summaries = [] + + with self._lock: + expired_keys = [k for k, d in self._deadlines.items() if now >= d] + + for key in expired_keys: + events = self._buckets.pop(key, []) + del self._deadlines[key] + + if len(events) < 2: + continue # Single event already sent on ingest, no summary needed + + rule_type = key.split(':')[0] + rule = AGGREGATION_RULES.get(rule_type, {}) + min_count = rule.get('min_count', 2) + + if len(events) < min_count: + continue # Not enough events for a summary + + summary = self._create_summary(events, rule) + if summary: + summaries.append(summary) + + return summaries + + def _create_summary(self, events: List[NotificationEvent], + rule: dict) -> Optional[NotificationEvent]: + """Create a single summary event from multiple events.""" + if not events: + return None + + first = events[0] + # Determine highest severity + sev_order = {'INFO': 0, 'WARNING': 1, 'CRITICAL': 2} + max_severity = max(events, key=lambda e: sev_order.get(e.severity, 0)).severity + + # Collect unique entity_ids + entity_ids = list(set(e.entity_id for e in events if e.entity_id)) + entity_list = ', '.join(entity_ids[:10]) if entity_ids else 'multiple sources' + if len(entity_ids) > 10: + entity_list += f' (+{len(entity_ids) - 10} more)' + + # Calculate window + window_secs = events[-1].ts_epoch - events[0].ts_epoch + if window_secs < 120: + window_str = f'{int(window_secs)}s' + else: + window_str = f'{int(window_secs / 60)}m' + + burst_type = rule.get('burst_type', 'burst_generic') + + data = { + 'hostname': first.data.get('hostname', socket.gethostname()), + 'count': str(len(events)), + 'window': window_str, + 'entity_list': entity_list, + 'event_type': first.event_type, + } + + return NotificationEvent( + event_type=burst_type, + severity=max_severity, + data=data, + source='aggregator', + entity=first.entity, + entity_id='burst', + ) + + # ─── Notification Manager ───────────────────────────────────────── class NotificationManager: @@ -81,9 +267,17 @@ class NotificationManager: self._polling_collector: Optional[PollingCollector] = None self._dispatch_thread: Optional[threading.Thread] = None - # Cooldown tracking: {event_type_or_key: last_sent_timestamp} + # Webhook receiver (no thread, passive) + self._hook_watcher: Optional[ProxmoxHookWatcher] = None + + # Cooldown tracking: {fingerprint: last_sent_timestamp} self._cooldowns: Dict[str, float] = {} + # Storm protection + self._group_limiter = GroupRateLimiter() + self._aggregator = BurstAggregator() + self._aggregation_thread: Optional[threading.Thread] = None + # Stats self._stats = { 'started_at': None, @@ -180,6 +374,7 @@ class NotificationManager: return self._load_config() + self._load_cooldowns_from_db() if not self._enabled: print("[NotificationManager] Service is disabled. Skipping start.") @@ -220,19 +415,48 @@ class NotificationManager: def _dispatch_loop(self): """Main dispatch loop: reads queue -> filters -> formats -> sends -> records.""" + last_cleanup = time.monotonic() + last_flush = time.monotonic() + cleanup_interval = 3600 # Cleanup cooldowns every hour + flush_interval = 5 # Flush aggregation buckets every 5s + while self._running: try: event = self._event_queue.get(timeout=2) except Empty: + # Periodic maintenance during idle + now_mono = time.monotonic() + if now_mono - last_cleanup > cleanup_interval: + self._cleanup_old_cooldowns() + last_cleanup = now_mono + # Flush expired aggregation buckets + if now_mono - last_flush > flush_interval: + self._flush_aggregation() + last_flush = now_mono continue try: self._process_event(event) except Exception as e: print(f"[NotificationManager] Dispatch error: {e}") + + # Also flush aggregation after each event + if time.monotonic() - last_flush > flush_interval: + self._flush_aggregation() + last_flush = time.monotonic() + + def _flush_aggregation(self): + """Flush expired aggregation buckets and dispatch summaries.""" + try: + summaries = self._aggregator.flush_expired() + for summary_event in summaries: + # Burst summaries bypass aggregator but still pass cooldown + rate limit + self._process_event_direct(summary_event) + except Exception as e: + print(f"[NotificationManager] Aggregation flush error: {e}") def _process_event(self, event: NotificationEvent): - """Process a single event from the queue.""" + """Process a single event: filter -> aggregate -> cooldown -> rate limit -> dispatch.""" if not self._enabled: return @@ -246,14 +470,43 @@ class NotificationManager: if not self._meets_severity(event.severity, min_severity): return + # Try aggregation (may buffer the event) + result = self._aggregator.ingest(event) + if result is None: + return # Buffered, will be flushed as summary later + event = result # Use original event (first in burst passes through) + + # From here, proceed with dispatch (shared with _process_event_direct) + self._dispatch_event(event) + + def _process_event_direct(self, event: NotificationEvent): + """Process a burst summary event. Bypasses aggregator but applies all other filters.""" + if not self._enabled: + return + + # Check severity filter + min_severity = self._config.get('filter.min_severity', 'INFO') + if not self._meets_severity(event.severity, min_severity): + return + + self._dispatch_event(event) + + def _dispatch_event(self, event: NotificationEvent): + """Shared dispatch pipeline: cooldown -> rate limit -> render -> send.""" # Check cooldown if not self._check_cooldown(event): return - # Render message from template + # Check group rate limit + template = TEMPLATES.get(event.event_type, {}) + group = template.get('group', 'system') + if not self._group_limiter.allow(group): + return + + # Render message from template (structured output) rendered = render_template(event.event_type, event.data) - # Optional AI enhancement + # Optional AI enhancement (on text body only) ai_config = { 'enabled': self._config.get('ai_enabled', 'false'), 'provider': self._config.get('ai_provider', ''), @@ -264,10 +517,15 @@ class NotificationManager: rendered['title'], rendered['body'], rendered['severity'], ai_config ) + # Enrich data with structured fields for channels that support them + enriched_data = dict(event.data) + enriched_data['_rendered_fields'] = rendered.get('fields', []) + enriched_data['_body_html'] = rendered.get('body_html', '') + # Send through all active channels self._dispatch_to_channels( rendered['title'], body, rendered['severity'], - event.event_type, event.data, event.source + event.event_type, enriched_data, event.source ) def _dispatch_to_channels(self, title: str, body: str, severity: str, @@ -323,20 +581,67 @@ class NotificationManager: else: cooldown = DEFAULT_COOLDOWNS.get(event.severity, 300) - # CRITICAL events have zero cooldown by default + # CRITICAL events: 60s minimum cooldown (prevents storm, but delivers fast) if event.severity == 'CRITICAL' and cooldown_str is None: - cooldown = 0 + cooldown = 60 - # Check against last sent time - dedup_key = f"{event.event_type}:{event.data.get('category', '')}:{event.data.get('vmid', '')}" - last_sent = self._cooldowns.get(dedup_key, 0) + # Check against last sent time using stable fingerprint + last_sent = self._cooldowns.get(event.fingerprint, 0) if now - last_sent < cooldown: return False - self._cooldowns[dedup_key] = now + self._cooldowns[event.fingerprint] = now + self._persist_cooldown(event.fingerprint, now) return True + def _load_cooldowns_from_db(self): + """Load persistent cooldown state from SQLite (up to 48h).""" + try: + if not DB_PATH.exists(): + return + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + cursor = conn.cursor() + cursor.execute('SELECT fingerprint, last_sent_ts FROM notification_last_sent') + now = time.time() + for fp, ts in cursor.fetchall(): + if now - ts < 172800: # 48h window + self._cooldowns[fp] = ts + conn.close() + except Exception as e: + print(f"[NotificationManager] Failed to load cooldowns: {e}") + + def _persist_cooldown(self, fingerprint: str, ts: float): + """Save cooldown timestamp to SQLite for restart persistence.""" + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + conn.execute(''' + INSERT OR REPLACE INTO notification_last_sent (fingerprint, last_sent_ts, count) + VALUES (?, ?, COALESCE( + (SELECT count + 1 FROM notification_last_sent WHERE fingerprint = ?), 1 + )) + ''', (fingerprint, int(ts), fingerprint)) + conn.commit() + conn.close() + except Exception: + pass # Non-critical, in-memory cooldown still works + + def _cleanup_old_cooldowns(self): + """Remove cooldown entries older than 48h from both memory and DB.""" + cutoff = time.time() - 172800 # 48h + self._cooldowns = {k: v for k, v in self._cooldowns.items() if v > cutoff} + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('DELETE FROM notification_last_sent WHERE last_sent_ts < ?', (int(cutoff),)) + conn.commit() + conn.close() + except Exception: + pass + @staticmethod def _meets_severity(event_severity: str, min_severity: str) -> bool: """Check if event severity meets the minimum threshold.""" @@ -487,6 +792,31 @@ class NotificationManager: 'results': results, } + # ─── Proxmox Webhook ────────────────────────────────────────── + + def process_webhook(self, payload: dict) -> dict: + """Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher.""" + if not self._hook_watcher: + self._hook_watcher = ProxmoxHookWatcher(self._event_queue) + return self._hook_watcher.process_webhook(payload) + + def get_webhook_secret(self) -> str: + """Get configured webhook secret, or empty string if none.""" + if not self._config: + self._load_config() + return self._config.get('webhook_secret', '') + + def get_webhook_allowed_ips(self) -> list: + """Get list of allowed IPs for webhook, or empty list (allow all).""" + if not self._config: + self._load_config() + raw = self._config.get('webhook_allowed_ips', '') + if not raw: + return [] + return [ip.strip() for ip in str(raw).split(',') if ip.strip()] + + # ─── Status & Settings ────────────────────────────────────── + def get_status(self) -> Dict[str, Any]: """Get current service status.""" if not self._config: @@ -618,6 +948,8 @@ class NotificationManager: 'event_groups': EVENT_GROUPS, 'event_types': get_event_types_by_group(), 'default_events': get_default_enabled_events(), + 'webhook_secret': self._config.get('webhook_secret', ''), + 'webhook_allowed_ips': self._config.get('webhook_allowed_ips', ''), } def save_settings(self, settings: Dict[str, str]) -> Dict[str, Any]: diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index cf4ac832..d76850b7 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -313,6 +313,38 @@ TEMPLATES = { 'group': 'system', 'default_enabled': False, }, + + # ── Burst aggregation summaries ── + 'burst_auth_fail': { + 'title': '{hostname}: {count} auth failures in {window}', + 'body': '{count} authentication failures detected in {window}.\nSources: {entity_list}', + 'group': 'security', + 'default_enabled': True, + }, + 'burst_ip_block': { + 'title': '{hostname}: Fail2Ban banned {count} IPs in {window}', + 'body': '{count} IPs banned by Fail2Ban in {window}.\nIPs: {entity_list}', + 'group': 'security', + 'default_enabled': True, + }, + 'burst_disk_io': { + 'title': '{hostname}: {count} disk I/O errors on {entity_list}', + 'body': '{count} I/O errors detected in {window}.\nDevices: {entity_list}', + 'group': 'storage', + 'default_enabled': True, + }, + 'burst_cluster': { + 'title': '{hostname}: Cluster flapping detected ({count} changes)', + 'body': 'Cluster state changed {count} times in {window}.\nNodes: {entity_list}', + 'group': 'cluster', + 'default_enabled': True, + }, + 'burst_generic': { + 'title': '{hostname}: {count} {event_type} events in {window}', + 'body': '{count} events of type {event_type} in {window}.\n{entity_list}', + 'group': 'system', + 'default_enabled': True, + }, } # ─── Event Groups (for UI filtering) ───────────────────────────── @@ -339,23 +371,24 @@ def _get_hostname() -> str: return 'proxmox' -def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, str]: - """Render a template with the given data. +def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]: + """Render a template into a structured notification object. - Args: - event_type: Key from TEMPLATES dict - data: Variables to fill into the template - - Returns: - {'title': rendered_title, 'body': rendered_body, 'severity': severity} + Returns structured output usable by all channels: + title, body (text), body_text, body_html (escaped), fields, tags, severity, group """ + import html as html_mod + template = TEMPLATES.get(event_type) if not template: - # Fallback for unknown event types + fallback_body = data.get('message', data.get('reason', str(data))) + severity = data.get('severity', 'INFO') return { 'title': f"{_get_hostname()}: {event_type}", - 'body': data.get('message', data.get('reason', str(data))), - 'severity': data.get('severity', 'INFO'), + 'body': fallback_body, 'body_text': fallback_body, + 'body_html': f'

{html_mod.escape(str(fallback_body))}

', + 'fields': [], 'tags': [severity, 'system', event_type], + 'severity': severity, 'group': 'system', } # Ensure hostname is always available @@ -363,58 +396,65 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, str]: 'hostname': _get_hostname(), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'severity': data.get('severity', 'INFO'), + # Burst event variables + 'window': '', 'entity_list': '', # Common defaults - 'vmid': '', - 'vmname': '', - 'reason': '', - 'summary': '', - 'details': '', - 'category': '', - 'previous': '', - 'current': '', - 'duration': '', - 'value': '', - 'threshold': '', - 'source_ip': '', - 'username': '', - 'service': '', - 'service_name': '', - 'node_name': '', - 'target_node': '', - 'mount': '', - 'device': '', - 'used': '', - 'total': '', - 'available': '', - 'cores': '', - 'count': '', - 'size': '', - 'snapshot_name': '', - 'jail': '', - 'failures': '', - 'quorum': '', - 'change_details': '', - 'message': '', + 'vmid': '', 'vmname': '', 'reason': '', 'summary': '', + 'details': '', 'category': '', 'previous': '', 'current': '', + 'duration': '', 'value': '', 'threshold': '', + 'source_ip': '', 'username': '', 'service': '', 'service_name': '', + 'node_name': '', 'target_node': '', 'mount': '', 'device': '', + 'used': '', 'total': '', 'available': '', 'cores': '', + 'count': '', 'size': '', 'snapshot_name': '', 'jail': '', + 'failures': '', 'quorum': '', 'change_details': '', 'message': '', } variables.update(data) try: title = template['title'].format(**variables) except (KeyError, ValueError): - title = template['title'] # Use raw template if formatting fails + title = template['title'] try: - body = template['body'].format(**variables) + body_text = template['body'].format(**variables) except (KeyError, ValueError): - body = template['body'] + body_text = template['body'] # Clean up empty lines from missing optional variables - body = '\n'.join(line for line in body.split('\n') if line.strip()) + body_text = '\n'.join(line for line in body_text.split('\n') if line.strip()) + + severity = variables.get('severity', 'INFO') + group = template.get('group', 'system') + + # Build structured fields for Discord embeds / rich notifications + fields = [] + field_map = [ + ('vmid', 'VM/CT'), ('vmname', 'Name'), ('device', 'Device'), + ('source_ip', 'Source IP'), ('node_name', 'Node'), ('category', 'Category'), + ('service_name', 'Service'), ('jail', 'Jail'), ('username', 'User'), + ('count', 'Count'), ('window', 'Window'), ('entity_list', 'Affected'), + ] + for key, label in field_map: + val = variables.get(key, '') + if val: + fields.append((label, str(val))) + + # Build HTML body with escaped content + body_html_parts = [] + for line in body_text.split('\n'): + if line.strip(): + body_html_parts.append(f'

{html_mod.escape(line)}

') + body_html = '\n'.join(body_html_parts) if body_html_parts else f'

{html_mod.escape(body_text)}

' return { 'title': title, - 'body': body, - 'severity': variables.get('severity', 'INFO'), + 'body': body_text, # backward compat + 'body_text': body_text, + 'body_html': body_html, + 'fields': fields, + 'tags': [severity, group, event_type], + 'severity': severity, + 'group': group, }