From 34d04e57dd69f3a2d218047887d63352391364ec Mon Sep 17 00:00:00 2001 From: MacRimi Date: Wed, 18 Feb 2026 17:24:26 +0100 Subject: [PATCH] Update notification service --- AppImage/components/notification-settings.tsx | 827 ++++++++++++++++++ AppImage/components/settings.tsx | 4 + AppImage/scripts/build_appimage.sh | 5 + AppImage/scripts/flask_notification_routes.py | 102 +++ AppImage/scripts/flask_server.py | 13 + AppImage/scripts/health_persistence.py | 18 + AppImage/scripts/notification_channels.py | 402 +++++++++ AppImage/scripts/notification_events.py | 698 +++++++++++++++ AppImage/scripts/notification_manager.py | 770 ++++++++++++++++ AppImage/scripts/notification_templates.py | 554 ++++++++++++ 10 files changed, 3393 insertions(+) create mode 100644 AppImage/components/notification-settings.tsx create mode 100644 AppImage/scripts/flask_notification_routes.py create mode 100644 AppImage/scripts/notification_channels.py create mode 100644 AppImage/scripts/notification_events.py create mode 100644 AppImage/scripts/notification_manager.py create mode 100644 AppImage/scripts/notification_templates.py diff --git a/AppImage/components/notification-settings.tsx b/AppImage/components/notification-settings.tsx new file mode 100644 index 00000000..07e43881 --- /dev/null +++ b/AppImage/components/notification-settings.tsx @@ -0,0 +1,827 @@ +"use client" + +import { useState, useEffect, useCallback } from "react" +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "./ui/card" +import { Tabs, TabsList, TabsTrigger, TabsContent } from "./ui/tabs" +import { Input } from "./ui/input" +import { Label } from "./ui/label" +import { Badge } from "./ui/badge" +import { Checkbox } from "./ui/checkbox" +import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "./ui/select" +import { fetchApi } from "../lib/api-config" +import { + Bell, BellOff, Send, CheckCircle2, XCircle, Loader2, + AlertTriangle, Info, Settings2, Zap, Eye, EyeOff, + Trash2, ChevronDown, ChevronUp, TestTube2 +} from "lucide-react" + +interface ChannelConfig { + enabled: boolean + bot_token?: string + chat_id?: string + url?: string + token?: string + webhook_url?: string +} + +interface NotificationConfig { + enabled: boolean + channels: Record + severity_filter: string + event_categories: Record + ai_enabled: boolean + ai_provider: string + ai_api_key: string + ai_model: string + hostname: string +} + +interface ServiceStatus { + enabled: boolean + running: boolean + channels: Record + queue_size: number + last_sent: string | null + total_sent_24h: number +} + +interface HistoryEntry { + id: number + event_type: string + channel: string + title: string + severity: string + sent_at: string + success: boolean + error_message: string | null +} + +const SEVERITY_OPTIONS = [ + { value: "critical", label: "Critical only" }, + { value: "warning", label: "Warning + Critical" }, + { value: "info", label: "All (Info + Warning + Critical)" }, +] + +const EVENT_CATEGORIES = [ + { key: "system", label: "System", desc: "Startup, shutdown, kernel events" }, + { key: "vm_ct", label: "VM / CT", desc: "Start, stop, crash, migration" }, + { key: "backup", label: "Backups", desc: "Backup start, complete, fail" }, + { key: "resources", label: "Resources", desc: "CPU, memory, temperature" }, + { key: "storage", label: "Storage", desc: "Disk space, I/O errors, SMART" }, + { key: "network", label: "Network", desc: "Connectivity, bond, latency" }, + { key: "security", label: "Security", desc: "Auth failures, fail2ban, firewall" }, + { key: "cluster", label: "Cluster", desc: "Quorum, split-brain, HA fencing" }, +] + +const AI_PROVIDERS = [ + { value: "openai", label: "OpenAI" }, + { value: "groq", label: "Groq" }, +] + +const DEFAULT_CONFIG: NotificationConfig = { + enabled: false, + channels: { + telegram: { enabled: false }, + gotify: { enabled: false }, + discord: { enabled: false }, + }, + severity_filter: "warning", + event_categories: { + system: true, vm_ct: true, backup: true, resources: true, + storage: true, network: true, security: true, cluster: true, + }, + ai_enabled: false, + ai_provider: "openai", + ai_api_key: "", + ai_model: "", + hostname: "", +} + +export function NotificationSettings() { + const [config, setConfig] = useState(DEFAULT_CONFIG) + const [status, setStatus] = useState(null) + const [history, setHistory] = useState([]) + const [loading, setLoading] = useState(true) + const [saving, setSaving] = useState(false) + const [saved, setSaved] = useState(false) + const [testing, setTesting] = useState(null) + const [testResult, setTestResult] = useState<{ channel: string; success: boolean; message: string } | null>(null) + const [showHistory, setShowHistory] = useState(false) + const [showAdvanced, setShowAdvanced] = useState(false) + const [showSecrets, setShowSecrets] = useState>({}) + const [editMode, setEditMode] = useState(false) + const [hasChanges, setHasChanges] = useState(false) + const [originalConfig, setOriginalConfig] = useState(DEFAULT_CONFIG) + + const loadConfig = useCallback(async () => { + try { + const data = await fetchApi<{ success: boolean; config: NotificationConfig }>("/api/notifications/settings") + if (data.success && data.config) { + setConfig(data.config) + setOriginalConfig(data.config) + } + } catch (err) { + console.error("Failed to load notification settings:", err) + } finally { + setLoading(false) + } + }, []) + + const loadStatus = useCallback(async () => { + try { + const data = await fetchApi<{ success: boolean } & ServiceStatus>("/api/notifications/status") + if (data.success) { + setStatus(data) + } + } catch { + // Service may not be running yet + } + }, []) + + const loadHistory = useCallback(async () => { + try { + const data = await fetchApi<{ success: boolean; history: HistoryEntry[]; total: number }>("/api/notifications/history?limit=20") + if (data.success) { + setHistory(data.history || []) + } + } catch { + // Ignore + } + }, []) + + useEffect(() => { + loadConfig() + loadStatus() + }, [loadConfig, loadStatus]) + + useEffect(() => { + if (showHistory) loadHistory() + }, [showHistory, loadHistory]) + + const updateConfig = (updater: (prev: NotificationConfig) => NotificationConfig) => { + setConfig(prev => { + const next = updater(prev) + setHasChanges(true) + return next + }) + } + + const updateChannel = (channel: string, field: string, value: string | boolean) => { + updateConfig(prev => ({ + ...prev, + channels: { + ...prev.channels, + [channel]: { ...prev.channels[channel], [field]: value }, + }, + })) + } + + const handleSave = async () => { + setSaving(true) + try { + await fetchApi("/api/notifications/settings", { + method: "POST", + body: JSON.stringify(config), + }) + setOriginalConfig(config) + setHasChanges(false) + setEditMode(false) + setSaved(true) + setTimeout(() => setSaved(false), 3000) + loadStatus() + } catch (err) { + console.error("Failed to save notification settings:", err) + } finally { + setSaving(false) + } + } + + const handleCancel = () => { + setConfig(originalConfig) + setHasChanges(false) + setEditMode(false) + } + + const handleTest = async (channel: string) => { + setTesting(channel) + setTestResult(null) + try { + const data = await fetchApi<{ success: boolean; message: string }>("/api/notifications/test", { + method: "POST", + body: JSON.stringify({ channel }), + }) + setTestResult({ channel, success: data.success, message: data.message }) + } catch (err) { + setTestResult({ channel, success: false, message: String(err) }) + } finally { + setTesting(null) + setTimeout(() => setTestResult(null), 5000) + } + } + + const handleClearHistory = async () => { + try { + await fetchApi("/api/notifications/history", { method: "DELETE" }) + setHistory([]) + } catch { + // Ignore + } + } + + const toggleSecret = (key: string) => { + setShowSecrets(prev => ({ ...prev, [key]: !prev[key] })) + } + + if (loading) { + return ( + + +
+ + Notifications +
+
+ +
+
+
+ + + ) + } + + const activeChannels = Object.entries(config.channels).filter(([, ch]) => ch.enabled).length + + return ( + + +
+
+ + Notifications + {config.enabled && ( + + Active + + )} +
+
+ {saved && ( + + + Saved + + )} + {editMode ? ( + <> + + + + ) : ( + + )} +
+
+ + Configure notification channels and event filters. Receive alerts via Telegram, Gotify, or Discord. + +
+ + + {/* ── Service Status ── */} + {status && ( +
+
+
+ + {status.running ? "Service running" : "Service stopped"} + + {status.total_sent_24h > 0 && ( + + {status.total_sent_24h} sent in last 24h + + )} +
+ {activeChannels > 0 && ( + + {activeChannels} channel{activeChannels > 1 ? "s" : ""} + + )} +
+ )} + + {/* ── Enable/Disable ── */} +
+
+ {config.enabled ? ( + + ) : ( + + )} +
+ Enable Notifications +

Activate the notification service

+
+
+ +
+ + {config.enabled && ( + <> + {/* ── Channel Configuration ── */} +
+
+ + Channels +
+ + + + + Telegram + + + Gotify + + + Discord + + + + {/* Telegram */} + +
+ + +
+ {config.channels.telegram?.enabled && ( + <> +
+ +
+ updateChannel("telegram", "bot_token", e.target.value)} + disabled={!editMode} + /> + +
+
+
+ + updateChannel("telegram", "chat_id", e.target.value)} + disabled={!editMode} + /> +
+ {!editMode && config.channels.telegram?.bot_token && ( + + )} + + )} +
+ + {/* Gotify */} + +
+ + +
+ {config.channels.gotify?.enabled && ( + <> +
+ + updateChannel("gotify", "url", e.target.value)} + disabled={!editMode} + /> +
+
+ +
+ updateChannel("gotify", "token", e.target.value)} + disabled={!editMode} + /> + +
+
+ {!editMode && config.channels.gotify?.url && ( + + )} + + )} +
+ + {/* Discord */} + +
+ + +
+ {config.channels.discord?.enabled && ( + <> +
+ +
+ updateChannel("discord", "webhook_url", e.target.value)} + disabled={!editMode} + /> + +
+
+ {!editMode && config.channels.discord?.webhook_url && ( + + )} + + )} +
+
+ + {/* Test Result */} + {testResult && ( +
+ {testResult.success ? ( + + ) : ( + + )} + {testResult.message} +
+ )} +
+ + {/* ── Severity Filter ── */} +
+
+ + Severity Filter +
+ +
+ + {/* ── Event Categories ── */} +
+
+ + Event Categories +
+
+ {EVENT_CATEGORIES.map(cat => ( + + ))} +
+
+ + {/* ── Advanced: AI Enhancement ── */} +
+ + + {showAdvanced && ( +
+
+
+ AI-Enhanced Messages +

Use AI to generate contextual notification messages

+
+ +
+ + {config.ai_enabled && ( + <> +
+ + +
+
+ +
+ updateConfig(p => ({ ...p, ai_api_key: e.target.value }))} + disabled={!editMode} + /> + +
+
+
+ + updateConfig(p => ({ ...p, ai_model: e.target.value }))} + disabled={!editMode} + /> +
+
+ +

+ AI enhancement is optional. When enabled, notifications include contextual analysis and recommended actions. If the AI service is unavailable, standard templates are used as fallback. +

+
+ + )} +
+ )} +
+ + {/* ── Notification History ── */} +
+ + + {showHistory && ( +
+ {history.length === 0 ? ( +

No notifications sent yet

+ ) : ( + <> +
+ +
+
+ {history.map(entry => ( +
+ {entry.success ? ( + + ) : ( + + )} +
+ {entry.title || entry.event_type} + + {entry.channel} - {new Date(entry.sent_at).toLocaleString()} + +
+ + {entry.severity} + +
+ ))} +
+ + )} +
+ )} +
+ + )} + + {/* ── Footer info ── */} +
+ +

+ {config.enabled + ? "Notifications are active. Events matching your severity filter and category selection will be sent to configured channels." + : "Enable notifications to receive alerts about system events, health status changes, and security incidents via Telegram, Gotify, or Discord."} +

+
+ + + ) +} diff --git a/AppImage/components/settings.tsx b/AppImage/components/settings.tsx index 4f037221..f2631177 100644 --- a/AppImage/components/settings.tsx +++ b/AppImage/components/settings.tsx @@ -3,6 +3,7 @@ import { useState, useEffect } from "react" import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "./ui/card" import { Wrench, Package, Ruler, HeartPulse, Cpu, MemoryStick, HardDrive, CircleDot, Network, Server, Settings2, FileText, RefreshCw, Shield, AlertTriangle, Info, Loader2, Check } from "lucide-react" +import { NotificationSettings } from "./notification-settings" import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "./ui/select" import { Input } from "./ui/input" import { Badge } from "./ui/badge" @@ -438,6 +439,9 @@ export function Settings() { + {/* Notification Settings */} + + {/* ProxMenux Optimizations */} diff --git a/AppImage/scripts/build_appimage.sh b/AppImage/scripts/build_appimage.sh index 447dd60e..3b5f3090 100644 --- a/AppImage/scripts/build_appimage.sh +++ b/AppImage/scripts/build_appimage.sh @@ -91,6 +91,11 @@ cp "$SCRIPT_DIR/proxmox_storage_monitor.py" "$APP_DIR/usr/bin/" 2>/dev/null || e cp "$SCRIPT_DIR/flask_script_runner.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_script_runner.py not found" cp "$SCRIPT_DIR/security_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ security_manager.py not found" cp "$SCRIPT_DIR/flask_security_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_security_routes.py not found" +cp "$SCRIPT_DIR/notification_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_manager.py not found" +cp "$SCRIPT_DIR/notification_channels.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_channels.py not found" +cp "$SCRIPT_DIR/notification_templates.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_templates.py not found" +cp "$SCRIPT_DIR/notification_events.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_events.py not found" +cp "$SCRIPT_DIR/flask_notification_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_notification_routes.py not found" echo "📋 Adding translation support..." cat > "$APP_DIR/usr/bin/translate_cli.py" << 'PYEOF' diff --git a/AppImage/scripts/flask_notification_routes.py b/AppImage/scripts/flask_notification_routes.py new file mode 100644 index 00000000..6f37e867 --- /dev/null +++ b/AppImage/scripts/flask_notification_routes.py @@ -0,0 +1,102 @@ +""" +Flask routes for notification service configuration and management. +Blueprint pattern matching flask_health_routes.py / flask_security_routes.py. +""" + +from flask import Blueprint, jsonify, request +from notification_manager import notification_manager + +notification_bp = Blueprint('notifications', __name__) + + +@notification_bp.route('/api/notifications/settings', methods=['GET']) +def get_notification_settings(): + """Get all notification settings for the UI.""" + try: + settings = notification_manager.get_settings() + return jsonify(settings) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/settings', methods=['POST']) +def save_notification_settings(): + """Save notification settings from the UI.""" + try: + payload = request.get_json() + if not payload: + return jsonify({'error': 'No data provided'}), 400 + + result = notification_manager.save_settings(payload) + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/test', methods=['POST']) +def test_notification(): + """Send a test notification to one or all channels.""" + try: + data = request.get_json() or {} + channel = data.get('channel', 'all') + + result = notification_manager.test_channel(channel) + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/status', methods=['GET']) +def get_notification_status(): + """Get notification service status.""" + try: + status = notification_manager.get_status() + return jsonify(status) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/history', methods=['GET']) +def get_notification_history(): + """Get notification history with optional filters.""" + try: + limit = request.args.get('limit', 50, type=int) + offset = request.args.get('offset', 0, type=int) + severity = request.args.get('severity', '') + channel = request.args.get('channel', '') + + result = notification_manager.get_history(limit, offset, severity, channel) + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/history', methods=['DELETE']) +def clear_notification_history(): + """Clear all notification history.""" + try: + result = notification_manager.clear_history() + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@notification_bp.route('/api/notifications/send', methods=['POST']) +def send_notification(): + """Send a notification via API (for testing or external triggers).""" + try: + data = request.get_json() + if not data: + return jsonify({'error': 'No data provided'}), 400 + + result = notification_manager.send_notification( + event_type=data.get('event_type', 'custom'), + severity=data.get('severity', 'INFO'), + title=data.get('title', ''), + message=data.get('message', ''), + data=data.get('data', {}), + source='api' + ) + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 diff --git a/AppImage/scripts/flask_server.py b/AppImage/scripts/flask_server.py index 003d15f4..9c0088ef 100644 --- a/AppImage/scripts/flask_server.py +++ b/AppImage/scripts/flask_server.py @@ -46,6 +46,8 @@ from flask_health_routes import health_bp # noqa: E402 from flask_auth_routes import auth_bp # noqa: E402 from flask_proxmenux_routes import proxmenux_bp # noqa: E402 from flask_security_routes import security_bp # noqa: E402 +from flask_notification_routes import notification_bp # noqa: E402 +from notification_manager import notification_manager # noqa: E402 from jwt_middleware import require_auth # noqa: E402 import auth_manager # noqa: E402 @@ -120,6 +122,7 @@ app.register_blueprint(auth_bp) app.register_blueprint(health_bp) app.register_blueprint(proxmenux_bp) app.register_blueprint(security_bp) +app.register_blueprint(notification_bp) # Initialize terminal / WebSocket routes init_terminal_routes(app) @@ -7094,6 +7097,16 @@ if __name__ == '__main__': except Exception as e: print(f"[ProxMenux] Vital signs sampler failed to start: {e}") + # ── Notification Service ── + try: + notification_manager.start() + if notification_manager._enabled: + print(f"[ProxMenux] Notification service started (channels: {list(notification_manager._channels.keys())})") + else: + print("[ProxMenux] Notification service loaded (disabled - configure in Settings)") + except Exception as e: + print(f"[ProxMenux] Notification service failed to start: {e}") + # Check for SSL configuration ssl_ctx = None try: diff --git a/AppImage/scripts/health_persistence.py b/AppImage/scripts/health_persistence.py index 377f71da..1bf3a78f 100644 --- a/AppImage/scripts/health_persistence.py +++ b/AppImage/scripts/health_persistence.py @@ -114,6 +114,22 @@ class HealthPersistence: ) ''') + # Notification history table (records all sent notifications) + cursor.execute(''' + CREATE TABLE IF NOT EXISTS notification_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type TEXT NOT NULL, + channel TEXT NOT NULL, + title TEXT, + message TEXT, + severity TEXT, + sent_at TEXT NOT NULL, + success INTEGER DEFAULT 1, + error_message TEXT, + source TEXT DEFAULT 'server' + ) + ''') + # Migration: add suppression_hours column to errors if not present cursor.execute("PRAGMA table_info(errors)") columns = [col[1] for col in cursor.fetchall()] @@ -125,6 +141,8 @@ class HealthPersistence: cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON errors(category)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_resolved ON errors(resolved_at)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_events_error ON events(error_key)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_sent_at ON notification_history(sent_at)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_severity ON notification_history(severity)') conn.commit() conn.close() diff --git a/AppImage/scripts/notification_channels.py b/AppImage/scripts/notification_channels.py new file mode 100644 index 00000000..f1c2e1f2 --- /dev/null +++ b/AppImage/scripts/notification_channels.py @@ -0,0 +1,402 @@ +""" +ProxMenux Notification Channels +Provides transport adapters for Telegram, Gotify, and Discord. + +Each channel implements send() and test() with: +- Retry with exponential backoff (3 attempts) +- Request timeout of 10s +- Rate limiting (max 30 msg/min per channel) + +Author: MacRimi +""" + +import json +import time +import urllib.request +import urllib.error +import urllib.parse +from abc import ABC, abstractmethod +from collections import deque +from typing import Tuple, Optional, Dict, Any + + +# ─── Rate Limiter ──────────────────────────────────────────────── + +class RateLimiter: + """Token-bucket rate limiter: max N messages per window.""" + + def __init__(self, max_calls: int = 30, window_seconds: int = 60): + self.max_calls = max_calls + self.window = window_seconds + self._timestamps: deque = deque() + + def allow(self) -> bool: + now = time.monotonic() + while self._timestamps and now - self._timestamps[0] > self.window: + self._timestamps.popleft() + if len(self._timestamps) >= self.max_calls: + return False + self._timestamps.append(now) + return True + + def wait_time(self) -> float: + if not self._timestamps: + return 0.0 + return max(0.0, self.window - (time.monotonic() - self._timestamps[0])) + + +# ─── Base Channel ──────────────────────────────────────────────── + +class NotificationChannel(ABC): + """Abstract base for all notification channels.""" + + MAX_RETRIES = 3 + RETRY_DELAYS = [2, 4, 8] # exponential backoff seconds + REQUEST_TIMEOUT = 10 + + def __init__(self): + self._rate_limiter = RateLimiter(max_calls=30, window_seconds=60) + + @abstractmethod + def send(self, title: str, message: str, severity: str = 'INFO', + data: Optional[Dict] = None) -> Dict[str, Any]: + """Send a notification. Returns {success, error, channel}.""" + pass + + @abstractmethod + def test(self) -> Tuple[bool, str]: + """Send a test message. Returns (success, error_message).""" + pass + + @abstractmethod + def validate_config(self) -> Tuple[bool, str]: + """Check if config is valid without sending. Returns (valid, error).""" + pass + + def _http_request(self, url: str, data: bytes, headers: Dict[str, str], + method: str = 'POST') -> Tuple[int, str]: + """Execute HTTP request with timeout. Returns (status_code, body).""" + req = urllib.request.Request(url, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=self.REQUEST_TIMEOUT) as resp: + body = resp.read().decode('utf-8', errors='replace') + return resp.status, body + except urllib.error.HTTPError as e: + body = e.read().decode('utf-8', errors='replace') if e.fp else str(e) + return e.code, body + except urllib.error.URLError as e: + return 0, str(e.reason) + except Exception as e: + return 0, str(e) + + def _send_with_retry(self, send_fn) -> Dict[str, Any]: + """Wrap a send function with rate limiting and retry logic.""" + if not self._rate_limiter.allow(): + wait = self._rate_limiter.wait_time() + return { + 'success': False, + 'error': f'Rate limited. Retry in {wait:.0f}s', + 'rate_limited': True + } + + last_error = '' + for attempt in range(self.MAX_RETRIES): + try: + status, body = send_fn() + if 200 <= status < 300: + return {'success': True, 'error': None} + last_error = f'HTTP {status}: {body[:200]}' + except Exception as e: + last_error = str(e) + + if attempt < self.MAX_RETRIES - 1: + time.sleep(self.RETRY_DELAYS[attempt]) + + return {'success': False, 'error': last_error} + + +# ─── Telegram ──────────────────────────────────────────────────── + +class TelegramChannel(NotificationChannel): + """Telegram Bot API channel using HTML parse mode.""" + + API_BASE = 'https://api.telegram.org/bot{token}/sendMessage' + MAX_LENGTH = 4096 + + SEVERITY_ICONS = { + 'CRITICAL': '\U0001F534', # red circle + 'WARNING': '\U0001F7E1', # yellow circle + 'INFO': '\U0001F535', # blue circle + 'OK': '\U0001F7E2', # green circle + 'UNKNOWN': '\u26AA', # white circle + } + + def __init__(self, bot_token: str, chat_id: str): + super().__init__() + self.bot_token = bot_token.strip() + self.chat_id = chat_id.strip() + + def validate_config(self) -> Tuple[bool, str]: + if not self.bot_token: + return False, 'Bot token is required' + if not self.chat_id: + return False, 'Chat ID is required' + if ':' not in self.bot_token: + return False, 'Invalid bot token format (expected BOT_ID:TOKEN)' + return True, '' + + def send(self, title: str, message: str, severity: str = 'INFO', + data: Optional[Dict] = None) -> Dict[str, Any]: + icon = self.SEVERITY_ICONS.get(severity, self.SEVERITY_ICONS['INFO']) + html_msg = f"{icon} {self._escape_html(title)}\n\n{self._escape_html(message)}" + + # Split long messages + chunks = self._split_message(html_msg) + result = {'success': True, 'error': None, 'channel': 'telegram'} + + for chunk in chunks: + res = self._send_with_retry(lambda c=chunk: self._post_message(c)) + if not res['success']: + result = {**res, 'channel': 'telegram'} + break + + return result + + def test(self) -> Tuple[bool, str]: + valid, err = self.validate_config() + if not valid: + return False, err + + result = self.send( + 'ProxMenux Test', + 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', + 'INFO' + ) + return result['success'], result.get('error', '') + + def _post_message(self, text: str) -> Tuple[int, str]: + url = self.API_BASE.format(token=self.bot_token) + payload = json.dumps({ + 'chat_id': self.chat_id, + 'text': text, + 'parse_mode': 'HTML', + 'disable_web_page_preview': True, + }).encode('utf-8') + + return self._http_request(url, payload, {'Content-Type': 'application/json'}) + + def _split_message(self, text: str) -> list: + if len(text) <= self.MAX_LENGTH: + return [text] + chunks = [] + while text: + if len(text) <= self.MAX_LENGTH: + chunks.append(text) + break + split_at = text.rfind('\n', 0, self.MAX_LENGTH) + if split_at == -1: + split_at = self.MAX_LENGTH + chunks.append(text[:split_at]) + text = text[split_at:].lstrip('\n') + return chunks + + @staticmethod + def _escape_html(text: str) -> str: + return (text + .replace('&', '&') + .replace('<', '<') + .replace('>', '>')) + + +# ─── Gotify ────────────────────────────────────────────────────── + +class GotifyChannel(NotificationChannel): + """Gotify push notification channel with priority mapping.""" + + PRIORITY_MAP = { + 'OK': 1, + 'INFO': 2, + 'UNKNOWN': 3, + 'WARNING': 5, + 'CRITICAL': 10, + } + + def __init__(self, server_url: str, app_token: str): + super().__init__() + self.server_url = server_url.rstrip('/').strip() + self.app_token = app_token.strip() + + def validate_config(self) -> Tuple[bool, str]: + if not self.server_url: + return False, 'Server URL is required' + if not self.app_token: + return False, 'Application token is required' + if not self.server_url.startswith(('http://', 'https://')): + return False, 'Server URL must start with http:// or https://' + return True, '' + + def send(self, title: str, message: str, severity: str = 'INFO', + data: Optional[Dict] = None) -> Dict[str, Any]: + priority = self.PRIORITY_MAP.get(severity, 2) + + result = self._send_with_retry( + lambda: self._post_message(title, message, priority) + ) + result['channel'] = 'gotify' + return result + + def test(self) -> Tuple[bool, str]: + valid, err = self.validate_config() + if not valid: + return False, err + + result = self.send( + 'ProxMenux Test', + 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', + 'INFO' + ) + return result['success'], result.get('error', '') + + def _post_message(self, title: str, message: str, priority: int) -> Tuple[int, str]: + url = f"{self.server_url}/message?token={self.app_token}" + payload = json.dumps({ + 'title': title, + 'message': message, + 'priority': priority, + 'extras': { + 'client::display': {'contentType': 'text/markdown'} + } + }).encode('utf-8') + + return self._http_request(url, payload, {'Content-Type': 'application/json'}) + + +# ─── Discord ───────────────────────────────────────────────────── + +class DiscordChannel(NotificationChannel): + """Discord webhook channel with color-coded embeds.""" + + MAX_EMBED_DESC = 2048 + + SEVERITY_COLORS = { + 'CRITICAL': 0xED4245, # red + 'WARNING': 0xFEE75C, # yellow + 'INFO': 0x5865F2, # blurple + 'OK': 0x57F287, # green + 'UNKNOWN': 0x99AAB5, # grey + } + + def __init__(self, webhook_url: str): + super().__init__() + self.webhook_url = webhook_url.strip() + + def validate_config(self) -> Tuple[bool, str]: + if not self.webhook_url: + return False, 'Webhook URL is required' + if 'discord.com/api/webhooks/' not in self.webhook_url: + return False, 'Invalid Discord webhook URL' + return True, '' + + def send(self, title: str, message: str, severity: str = 'INFO', + data: Optional[Dict] = None) -> Dict[str, Any]: + color = self.SEVERITY_COLORS.get(severity, 0x5865F2) + + desc = message[:self.MAX_EMBED_DESC] if len(message) > self.MAX_EMBED_DESC else message + + embed = { + 'title': title, + 'description': desc, + 'color': color, + 'footer': {'text': 'ProxMenux Monitor'}, + 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), + } + + if data: + fields = [] + if data.get('category'): + fields.append({'name': 'Category', 'value': data['category'], 'inline': True}) + if data.get('hostname'): + fields.append({'name': 'Host', 'value': data['hostname'], 'inline': True}) + if data.get('severity'): + fields.append({'name': 'Severity', 'value': data['severity'], 'inline': True}) + if fields: + embed['fields'] = fields + + result = self._send_with_retry( + lambda: self._post_webhook(embed) + ) + result['channel'] = 'discord' + return result + + def test(self) -> Tuple[bool, str]: + valid, err = self.validate_config() + if not valid: + return False, err + + result = self.send( + 'ProxMenux Test', + 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', + 'INFO' + ) + return result['success'], result.get('error', '') + + def _post_webhook(self, embed: Dict) -> Tuple[int, str]: + payload = json.dumps({ + 'username': 'ProxMenux', + 'embeds': [embed] + }).encode('utf-8') + + return self._http_request( + self.webhook_url, payload, {'Content-Type': 'application/json'} + ) + + +# ─── Channel Factory ───────────────────────────────────────────── + +CHANNEL_TYPES = { + 'telegram': { + 'name': 'Telegram', + 'config_keys': ['bot_token', 'chat_id'], + 'class': TelegramChannel, + }, + 'gotify': { + 'name': 'Gotify', + 'config_keys': ['url', 'token'], + 'class': GotifyChannel, + }, + 'discord': { + 'name': 'Discord', + 'config_keys': ['webhook_url'], + 'class': DiscordChannel, + }, +} + + +def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[NotificationChannel]: + """Create a channel instance from type name and config dict. + + Args: + channel_type: 'telegram', 'gotify', or 'discord' + config: Dict with channel-specific keys (see CHANNEL_TYPES) + + Returns: + Channel instance or None if creation fails + """ + try: + if channel_type == 'telegram': + return TelegramChannel( + bot_token=config.get('bot_token', ''), + chat_id=config.get('chat_id', '') + ) + elif channel_type == 'gotify': + return GotifyChannel( + server_url=config.get('url', ''), + app_token=config.get('token', '') + ) + elif channel_type == 'discord': + return DiscordChannel( + webhook_url=config.get('webhook_url', '') + ) + except Exception as e: + print(f"[NotificationChannels] Failed to create {channel_type}: {e}") + return None diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py new file mode 100644 index 00000000..0487344b --- /dev/null +++ b/AppImage/scripts/notification_events.py @@ -0,0 +1,698 @@ +""" +ProxMenux Notification Event Watchers +Detects Proxmox events from journald, PVE task log, and health monitor. + +Architecture: +- JournalWatcher: Real-time stream of journald for critical events +- TaskWatcher: Real-time tail of /var/log/pve/tasks/index for VM/CT/backup events +- PollingCollector: Periodic poll of health_persistence pending notifications + +All watchers put events into a shared Queue consumed by NotificationManager. + +Author: MacRimi +""" + +import os +import re +import json +import time +import socket +import subprocess +import threading +from queue import Queue +from typing import Optional, Dict, Any +from pathlib import Path + + +# ─── Event Object ───────────────────────────────────────────────── + +class NotificationEvent: + """Represents a detected event ready for notification dispatch.""" + + __slots__ = ('event_type', 'severity', 'data', 'timestamp', 'source') + + def __init__(self, event_type: str, severity: str = 'INFO', + data: Optional[Dict[str, Any]] = None, + source: str = 'watcher'): + self.event_type = event_type + self.severity = severity + self.data = data or {} + self.timestamp = time.time() + self.source = source + + def __repr__(self): + return f"NotificationEvent({self.event_type}, {self.severity})" + + +def _hostname() -> str: + try: + return socket.gethostname().split('.')[0] + except Exception: + return 'proxmox' + + +# ─── Journal Watcher (Real-time) ───────────────────────────────── + +class JournalWatcher: + """Watches journald in real-time for critical system events. + + Uses 'journalctl -f -o json' subprocess to stream entries. + Detects: auth failures, kernel panics, OOM, service crashes, + disk I/O errors, split-brain, node disconnect, system shutdown, + fail2ban bans, firewall blocks, permission changes. + """ + + def __init__(self, event_queue: Queue): + self._queue = event_queue + self._running = False + self._thread: Optional[threading.Thread] = None + self._process: Optional[subprocess.Popen] = None + self._hostname = _hostname() + + # Dedup: track recent events to avoid duplicates + self._recent_events: Dict[str, float] = {} + self._dedup_window = 30 # seconds + + def start(self): + """Start the journal watcher thread.""" + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._watch_loop, daemon=True, + name='journal-watcher') + self._thread.start() + + def stop(self): + """Stop the journal watcher.""" + self._running = False + if self._process: + try: + self._process.terminate() + self._process.wait(timeout=5) + except Exception: + try: + self._process.kill() + except Exception: + pass + + def _watch_loop(self): + """Main watch loop with auto-restart on failure.""" + while self._running: + try: + self._run_journalctl() + except Exception as e: + print(f"[JournalWatcher] Error: {e}") + if self._running: + time.sleep(5) # Wait before restart + + def _run_journalctl(self): + """Run journalctl -f and process output line by line.""" + cmd = ['journalctl', '-f', '-o', 'json', '--no-pager', + '-n', '0'] # Start from now, don't replay history + + self._process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + text=True, bufsize=1 + ) + + for line in self._process.stdout: + if not self._running: + break + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + self._process_entry(entry) + except (json.JSONDecodeError, KeyError): + # Try plain text matching as fallback + self._process_plain(line) + + if self._process: + self._process.wait() + + def _process_entry(self, entry: Dict): + """Process a parsed journald JSON entry.""" + msg = entry.get('MESSAGE', '') + if not msg or not isinstance(msg, str): + return + + unit = entry.get('_SYSTEMD_UNIT', '') + syslog_id = entry.get('SYSLOG_IDENTIFIER', '') + priority = int(entry.get('PRIORITY', 6)) + + self._check_auth_failure(msg, syslog_id, entry) + self._check_fail2ban(msg, syslog_id) + self._check_kernel_critical(msg, syslog_id, priority) + self._check_service_failure(msg, unit) + self._check_disk_io(msg, syslog_id, priority) + self._check_cluster_events(msg, syslog_id) + self._check_system_shutdown(msg, syslog_id) + self._check_permission_change(msg, syslog_id) + self._check_firewall(msg, syslog_id) + + def _process_plain(self, line: str): + """Fallback: process a plain text log line.""" + self._check_auth_failure(line, '', {}) + self._check_fail2ban(line, '') + self._check_kernel_critical(line, '', 6) + self._check_cluster_events(line, '') + self._check_system_shutdown(line, '') + + # ── Detection methods ── + + def _check_auth_failure(self, msg: str, syslog_id: str, entry: Dict): + """Detect authentication failures (SSH, PAM, PVE).""" + patterns = [ + (r'Failed password for (?:invalid user )?(\S+) from (\S+)', 'ssh'), + (r'authentication failure.*rhost=(\S+).*user=(\S+)', 'pam'), + (r'pvedaemon\[.*authentication failure.*rhost=(\S+)', 'pve'), + ] + + for pattern, service in patterns: + match = re.search(pattern, msg, re.IGNORECASE) + if match: + groups = match.groups() + if service == 'ssh': + username, source_ip = groups[0], groups[1] + elif service == 'pam': + source_ip, username = groups[0], groups[1] + else: + source_ip = groups[0] + username = 'unknown' + + self._emit('auth_fail', 'WARNING', { + 'source_ip': source_ip, + 'username': username, + 'service': service, + 'hostname': self._hostname, + }) + return + + def _check_fail2ban(self, msg: str, syslog_id: str): + """Detect Fail2Ban IP bans.""" + if 'fail2ban' not in msg.lower() and syslog_id != 'fail2ban-server': + return + + # Ban detected + ban_match = re.search(r'Ban\s+(\S+)', msg) + if ban_match: + ip = ban_match.group(1) + jail_match = re.search(r'\[(\w+)\]', msg) + jail = jail_match.group(1) if jail_match else 'unknown' + + self._emit('ip_block', 'INFO', { + 'source_ip': ip, + 'jail': jail, + 'failures': '', + 'hostname': self._hostname, + }) + + def _check_kernel_critical(self, msg: str, syslog_id: str, priority: int): + """Detect kernel panics, OOM, segfaults, hardware errors.""" + critical_patterns = { + r'kernel panic': ('system_problem', 'CRITICAL', 'Kernel panic'), + r'Out of memory': ('system_problem', 'CRITICAL', 'Out of memory killer activated'), + r'segfault': ('system_problem', 'WARNING', 'Segmentation fault detected'), + r'BUG:': ('system_problem', 'CRITICAL', 'Kernel BUG detected'), + r'Call Trace:': ('system_problem', 'WARNING', 'Kernel call trace'), + r'I/O error.*dev\s+(\S+)': ('disk_io_error', 'CRITICAL', 'Disk I/O error'), + r'EXT4-fs error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), + r'BTRFS error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), + r'XFS.*error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), + r'ZFS.*error': ('disk_io_error', 'CRITICAL', 'ZFS pool error'), + r'mce:.*Hardware Error': ('system_problem', 'CRITICAL', 'Hardware error (MCE)'), + } + + for pattern, (event_type, severity, reason) in critical_patterns.items(): + if re.search(pattern, msg, re.IGNORECASE): + data = {'reason': reason, 'hostname': self._hostname} + + # Try to extract device for disk errors + dev_match = re.search(r'dev\s+(\S+)', msg) + if dev_match and event_type == 'disk_io_error': + data['device'] = dev_match.group(1) + + self._emit(event_type, severity, data) + return + + def _check_service_failure(self, msg: str, unit: str): + """Detect critical service failures.""" + service_patterns = [ + r'Failed to start (.+)', + r'Unit (\S+) (?:entered failed state|failed)', + r'(\S+)\.service: (?:Main process exited|Failed with result)', + ] + + for pattern in service_patterns: + match = re.search(pattern, msg) + if match: + service_name = match.group(1) + self._emit('service_fail', 'WARNING', { + 'service_name': service_name, + 'reason': msg[:200], + 'hostname': self._hostname, + }) + return + + def _check_disk_io(self, msg: str, syslog_id: str, priority: int): + """Detect disk I/O errors from kernel messages.""" + if syslog_id != 'kernel' and priority > 3: + return + + io_patterns = [ + r'blk_update_request: I/O error.*dev (\S+)', + r'Buffer I/O error on device (\S+)', + r'SCSI error.*sd(\w)', + r'ata\d+.*error', + ] + + for pattern in io_patterns: + match = re.search(pattern, msg) + if match: + device = match.group(1) if match.lastindex else 'unknown' + self._emit('disk_io_error', 'CRITICAL', { + 'device': device, + 'reason': msg[:200], + 'hostname': self._hostname, + }) + return + + def _check_cluster_events(self, msg: str, syslog_id: str): + """Detect cluster split-brain and node disconnect.""" + msg_lower = msg.lower() + + # Split-brain + if any(p in msg_lower for p in ['split-brain', 'split brain', + 'fencing required', 'cluster partition']): + quorum = 'unknown' + if 'quorum' in msg_lower: + quorum = 'lost' if 'lost' in msg_lower else 'valid' + + self._emit('split_brain', 'CRITICAL', { + 'quorum': quorum, + 'reason': msg[:200], + 'hostname': self._hostname, + }) + return + + # Node disconnect + if (('quorum' in msg_lower and 'lost' in msg_lower) or + ('node' in msg_lower and any(w in msg_lower for w in ['left', 'offline', 'lost']))): + + node_match = re.search(r'[Nn]ode\s+(\S+)', msg) + node_name = node_match.group(1) if node_match else 'unknown' + + self._emit('node_disconnect', 'CRITICAL', { + 'node_name': node_name, + 'hostname': self._hostname, + }) + + def _check_system_shutdown(self, msg: str, syslog_id: str): + """Detect system shutdown/reboot.""" + if 'systemd-journald' in syslog_id or 'systemd' in syslog_id: + if 'Journal stopped' in msg or 'Stopping Journal Service' in msg: + self._emit('system_shutdown', 'WARNING', { + 'reason': 'System journal stopped', + 'hostname': self._hostname, + }) + elif 'Shutting down' in msg or 'System is rebooting' in msg: + event = 'system_reboot' if 'reboot' in msg.lower() else 'system_shutdown' + self._emit(event, 'WARNING', { + 'reason': msg[:200], + 'hostname': self._hostname, + }) + + def _check_permission_change(self, msg: str, syslog_id: str): + """Detect user permission changes in PVE.""" + permission_patterns = [ + (r'set permissions.*user\s+(\S+)', 'Permission changed'), + (r'user added to group.*?(\S+)', 'Added to group'), + (r'user removed from group.*?(\S+)', 'Removed from group'), + (r'ACL updated.*?(\S+)', 'ACL updated'), + (r'Role assigned.*?(\S+)', 'Role assigned'), + ] + + for pattern, action in permission_patterns: + match = re.search(pattern, msg, re.IGNORECASE) + if match: + username = match.group(1) + self._emit('user_permission_change', 'INFO', { + 'username': username, + 'change_details': action, + 'hostname': self._hostname, + }) + return + + def _check_firewall(self, msg: str, syslog_id: str): + """Detect firewall issues (not individual drops, but rule errors).""" + if re.search(r'pve-firewall.*(?:error|failed|unable)', msg, re.IGNORECASE): + self._emit('firewall_issue', 'WARNING', { + 'reason': msg[:200], + 'hostname': self._hostname, + }) + + # ── Emit helper ── + + def _emit(self, event_type: str, severity: str, data: Dict): + """Emit event to queue with deduplication.""" + dedup_key = f"{event_type}:{data.get('source_ip', '')}:{data.get('device', '')}:{data.get('service_name', '')}" + + now = time.time() + last = self._recent_events.get(dedup_key, 0) + if now - last < self._dedup_window: + return # Skip duplicate + + self._recent_events[dedup_key] = now + + # Cleanup old dedup entries periodically + if len(self._recent_events) > 200: + cutoff = now - self._dedup_window * 2 + self._recent_events = { + k: v for k, v in self._recent_events.items() if v > cutoff + } + + self._queue.put(NotificationEvent(event_type, severity, data, source='journal')) + + +# ─── Task Watcher (Real-time) ──────────────────────────────────── + +class TaskWatcher: + """Watches /var/log/pve/tasks/index for VM/CT and backup events. + + The PVE task index file is appended when tasks start/finish. + Format: UPID:node:pid:pstart:starttime:type:id:user: + Final status is recorded when task completes. + """ + + TASK_LOG = '/var/log/pve/tasks/index' + + # Map PVE task types to our event types + TASK_MAP = { + 'qmstart': ('vm_start', 'INFO'), + 'qmstop': ('vm_stop', 'INFO'), + 'qmshutdown': ('vm_shutdown', 'INFO'), + 'qmreboot': ('vm_restart', 'INFO'), + 'qmreset': ('vm_restart', 'INFO'), + 'vzstart': ('ct_start', 'INFO'), + 'vzstop': ('ct_stop', 'INFO'), + 'vzshutdown': ('ct_stop', 'INFO'), + 'vzdump': ('backup_start', 'INFO'), + 'qmsnapshot': ('snapshot_complete', 'INFO'), + 'vzsnapshot': ('snapshot_complete', 'INFO'), + 'qmigrate': ('migration_start', 'INFO'), + 'vzmigrate': ('migration_start', 'INFO'), + } + + def __init__(self, event_queue: Queue): + self._queue = event_queue + self._running = False + self._thread: Optional[threading.Thread] = None + self._hostname = _hostname() + self._last_position = 0 + + def start(self): + if self._running: + return + self._running = True + + # Start at end of file + if os.path.exists(self.TASK_LOG): + try: + self._last_position = os.path.getsize(self.TASK_LOG) + except OSError: + self._last_position = 0 + + self._thread = threading.Thread(target=self._watch_loop, daemon=True, + name='task-watcher') + self._thread.start() + + def stop(self): + self._running = False + + def _watch_loop(self): + """Poll the task index file for new entries.""" + while self._running: + try: + if os.path.exists(self.TASK_LOG): + current_size = os.path.getsize(self.TASK_LOG) + + if current_size < self._last_position: + # File was truncated/rotated + self._last_position = 0 + + if current_size > self._last_position: + with open(self.TASK_LOG, 'r') as f: + f.seek(self._last_position) + new_lines = f.readlines() + self._last_position = f.tell() + + for line in new_lines: + self._process_task_line(line.strip()) + except Exception as e: + print(f"[TaskWatcher] Error reading task log: {e}") + + time.sleep(2) # Check every 2 seconds + + def _process_task_line(self, line: str): + """Process a single task index line. + + PVE task index format (space-separated): + UPID endtime status + Where UPID = UPID:node:pid:pstart:starttime:type:id:user: + """ + if not line: + return + + parts = line.split() + if not parts: + return + + upid = parts[0] + status = parts[2] if len(parts) >= 3 else '' + + # Parse UPID + upid_parts = upid.split(':') + if len(upid_parts) < 8: + return + + task_type = upid_parts[5] + vmid = upid_parts[6] + user = upid_parts[7] + + # Get VM/CT name + vmname = self._get_vm_name(vmid) if vmid else '' + + # Map to event type + event_info = self.TASK_MAP.get(task_type) + if not event_info: + return + + event_type, default_severity = event_info + + # Check if task failed + is_error = status and status != 'OK' and status != '' + + if is_error: + # Override to failure event + if 'start' in event_type: + event_type = event_type.replace('_start', '_fail') + elif 'complete' in event_type: + event_type = event_type.replace('_complete', '_fail') + severity = 'CRITICAL' + elif status == 'OK': + # Task completed successfully + if event_type == 'backup_start': + event_type = 'backup_complete' + elif event_type == 'migration_start': + event_type = 'migration_complete' + severity = 'INFO' + else: + # Task just started (no status yet) + severity = default_severity + + data = { + 'vmid': vmid, + 'vmname': vmname or f'ID {vmid}', + 'hostname': self._hostname, + 'user': user, + 'reason': status if is_error else '', + 'target_node': '', + 'size': '', + 'snapshot_name': '', + } + + self._queue.put(NotificationEvent(event_type, severity, data, source='task')) + + def _get_vm_name(self, vmid: str) -> str: + """Try to resolve VMID to name via config files.""" + if not vmid: + return '' + + # Try QEMU + conf_path = f'/etc/pve/qemu-server/{vmid}.conf' + name = self._read_name_from_conf(conf_path) + if name: + return name + + # Try LXC + conf_path = f'/etc/pve/lxc/{vmid}.conf' + name = self._read_name_from_conf(conf_path) + if name: + return name + + return '' + + @staticmethod + def _read_name_from_conf(path: str) -> str: + """Read 'name:' or 'hostname:' from PVE config file.""" + try: + if not os.path.exists(path): + return '' + with open(path, 'r') as f: + for line in f: + if line.startswith('name:'): + return line.split(':', 1)[1].strip() + if line.startswith('hostname:'): + return line.split(':', 1)[1].strip() + except (IOError, PermissionError): + pass + return '' + + +# ─── Polling Collector ──────────────────────────────────────────── + +class PollingCollector: + """Periodic collector that reads Health Monitor pending notifications. + + Polls health_persistence for: + - Pending notification events (state changes from Bloque A) + - Unnotified errors + - Update availability (every 24h) + """ + + def __init__(self, event_queue: Queue, poll_interval: int = 30): + self._queue = event_queue + self._running = False + self._thread: Optional[threading.Thread] = None + self._poll_interval = poll_interval + self._hostname = _hostname() + self._last_update_check = 0 + self._update_check_interval = 86400 # 24 hours + + def start(self): + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._poll_loop, daemon=True, + name='polling-collector') + self._thread.start() + + def stop(self): + self._running = False + + def _poll_loop(self): + """Main polling loop.""" + # Initial delay to let health monitor warm up + for _ in range(10): + if not self._running: + return + time.sleep(1) + + while self._running: + try: + self._collect_health_events() + self._check_updates() + except Exception as e: + print(f"[PollingCollector] Error: {e}") + + # Sleep in small increments for responsive shutdown + for _ in range(self._poll_interval): + if not self._running: + return + time.sleep(1) + + def _collect_health_events(self): + """Collect pending notification events from health_persistence.""" + try: + from health_persistence import health_persistence + + # Get pending notification events + events = health_persistence.get_pending_notifications() + for evt in events: + data = json.loads(evt.get('data', '{}')) if isinstance(evt.get('data'), str) else evt.get('data', {}) + + event_type = evt.get('event_type', 'state_change') + severity = data.get('severity', 'WARNING') + + data['hostname'] = self._hostname + data['error_key'] = evt.get('error_key', '') + + self._queue.put(NotificationEvent( + event_type, severity, data, source='health_monitor' + )) + + # Mark events as notified + if events: + event_ids = [e['id'] for e in events if 'id' in e] + if event_ids: + health_persistence.mark_events_notified(event_ids) + + # Also check unnotified errors + unnotified = health_persistence.get_unnotified_errors() + for error in unnotified: + self._queue.put(NotificationEvent( + 'new_error', error.get('severity', 'WARNING'), { + 'category': error.get('category', ''), + 'reason': error.get('reason', ''), + 'hostname': self._hostname, + 'error_key': error.get('error_key', ''), + }, + source='health_monitor' + )) + # Mark as notified + if 'id' in error: + health_persistence.mark_notified(error['id']) + + except ImportError: + pass # health_persistence not available (CLI mode) + except Exception as e: + print(f"[PollingCollector] Health event collection error: {e}") + + def _check_updates(self): + """Check for available system updates (every 24h).""" + now = time.time() + if now - self._last_update_check < self._update_check_interval: + return + + self._last_update_check = now + + try: + result = subprocess.run( + ['apt-get', '-s', 'upgrade'], + capture_output=True, text=True, timeout=60 + ) + + if result.returncode == 0: + # Count upgradeable packages + lines = [l for l in result.stdout.split('\n') + if l.startswith('Inst ')] + count = len(lines) + + if count > 0: + # Show first 5 package names + packages = [l.split()[1] for l in lines[:5]] + details = ', '.join(packages) + if count > 5: + details += f', ... and {count - 5} more' + + self._queue.put(NotificationEvent( + 'update_available', 'INFO', { + 'count': str(count), + 'details': details, + 'hostname': self._hostname, + }, + source='polling' + )) + except Exception: + pass # Non-critical, silently skip diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py new file mode 100644 index 00000000..19704cb1 --- /dev/null +++ b/AppImage/scripts/notification_manager.py @@ -0,0 +1,770 @@ +""" +ProxMenux Notification Manager +Central orchestrator for the notification service. + +Connects: +- notification_channels.py (transport: Telegram, Gotify, Discord) +- notification_templates.py (message formatting + optional AI) +- notification_events.py (event detection: Journal, Task, Polling watchers) +- health_persistence.py (DB: config storage, notification_history) + +Two interfaces consume this module: +1. Server mode: Flask imports and calls start()/stop()/send_notification() +2. CLI mode: `python3 notification_manager.py --action send --type vm_fail ...` + Scripts .sh in /usr/local/share/proxmenux/scripts call this directly. + +Author: MacRimi +""" + +import json +import os +import sys +import time +import socket +import sqlite3 +import threading +from queue import Queue, Empty +from datetime import datetime +from typing import Dict, Any, List, Optional +from pathlib import Path + +# Ensure local imports work +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +if BASE_DIR not in sys.path: + sys.path.insert(0, BASE_DIR) + +from notification_channels import create_channel, CHANNEL_TYPES +from notification_templates import ( + render_template, format_with_ai, TEMPLATES, + EVENT_GROUPS, get_event_types_by_group, get_default_enabled_events +) +from notification_events import ( + JournalWatcher, TaskWatcher, PollingCollector, NotificationEvent +) + + +# ─── Constants ──────────────────────────────────────────────────── + +DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db') +SETTINGS_PREFIX = 'notification.' + +# Cooldown defaults (seconds) +DEFAULT_COOLDOWNS = { + 'CRITICAL': 0, # No cooldown for critical + 'WARNING': 300, # 5 min + 'INFO': 900, # 15 min + 'resources': 900, # 15 min for resource alerts + 'updates': 86400, # 24h for update notifications +} + + +# ─── Notification Manager ───────────────────────────────────────── + +class NotificationManager: + """Central notification orchestrator. + + Manages channels, event watchers, deduplication, and dispatch. + Can run in server mode (background threads) or CLI mode (one-shot). + """ + + def __init__(self): + self._channels: Dict[str, Any] = {} # channel_name -> channel_instance + self._event_queue: Queue = Queue() + self._running = False + self._config: Dict[str, str] = {} + self._enabled = False + self._lock = threading.Lock() + + # Watchers + self._journal_watcher: Optional[JournalWatcher] = None + self._task_watcher: Optional[TaskWatcher] = None + self._polling_collector: Optional[PollingCollector] = None + self._dispatch_thread: Optional[threading.Thread] = None + + # Cooldown tracking: {event_type_or_key: last_sent_timestamp} + self._cooldowns: Dict[str, float] = {} + + # Stats + self._stats = { + 'started_at': None, + 'total_sent': 0, + 'total_errors': 0, + 'last_sent_at': None, + } + + # ─── Configuration ────────────────────────────────────────── + + def _load_config(self): + """Load notification settings from the shared SQLite database.""" + self._config = {} + try: + if not DB_PATH.exists(): + return + + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + cursor = conn.cursor() + cursor.execute( + 'SELECT setting_key, setting_value FROM user_settings WHERE setting_key LIKE ?', + (f'{SETTINGS_PREFIX}%',) + ) + for key, value in cursor.fetchall(): + # Strip prefix for internal use + short_key = key[len(SETTINGS_PREFIX):] + self._config[short_key] = value + conn.close() + except Exception as e: + print(f"[NotificationManager] Failed to load config: {e}") + + self._enabled = self._config.get('enabled', 'false') == 'true' + self._rebuild_channels() + + def _save_setting(self, key: str, value: str): + """Save a single notification setting to the database.""" + full_key = f'{SETTINGS_PREFIX}{key}' + now = datetime.now().isoformat() + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + cursor = conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO user_settings (setting_key, setting_value, updated_at) + VALUES (?, ?, ?) + ''', (full_key, value, now)) + conn.commit() + conn.close() + self._config[key] = value + except Exception as e: + print(f"[NotificationManager] Failed to save setting {key}: {e}") + + def _rebuild_channels(self): + """Rebuild channel instances from current config.""" + self._channels = {} + + for ch_type in CHANNEL_TYPES: + enabled_key = f'{ch_type}.enabled' + if self._config.get(enabled_key) != 'true': + continue + + # Gather config keys for this channel + ch_config = {} + for config_key in CHANNEL_TYPES[ch_type]['config_keys']: + full_key = f'{ch_type}.{config_key}' + ch_config[config_key] = self._config.get(full_key, '') + + channel = create_channel(ch_type, ch_config) + if channel: + valid, err = channel.validate_config() + if valid: + self._channels[ch_type] = channel + else: + print(f"[NotificationManager] Channel {ch_type} invalid: {err}") + + def reload_config(self): + """Reload config from DB without restarting.""" + with self._lock: + self._load_config() + return {'success': True, 'channels': list(self._channels.keys())} + + # ─── Server Mode (Background) ────────────────────────────── + + def start(self): + """Start the notification service in server mode. + + Launches watchers and dispatch loop as daemon threads. + Called by flask_server.py on startup. + """ + if self._running: + return + + self._load_config() + + if not self._enabled: + print("[NotificationManager] Service is disabled. Skipping start.") + return + + self._running = True + self._stats['started_at'] = datetime.now().isoformat() + + # Start event watchers + self._journal_watcher = JournalWatcher(self._event_queue) + self._task_watcher = TaskWatcher(self._event_queue) + self._polling_collector = PollingCollector(self._event_queue) + + self._journal_watcher.start() + self._task_watcher.start() + self._polling_collector.start() + + # Start dispatch loop + self._dispatch_thread = threading.Thread( + target=self._dispatch_loop, daemon=True, name='notification-dispatch' + ) + self._dispatch_thread.start() + + print(f"[NotificationManager] Started with channels: {list(self._channels.keys())}") + + def stop(self): + """Stop the notification service cleanly.""" + self._running = False + + if self._journal_watcher: + self._journal_watcher.stop() + if self._task_watcher: + self._task_watcher.stop() + if self._polling_collector: + self._polling_collector.stop() + + print("[NotificationManager] Stopped.") + + def _dispatch_loop(self): + """Main dispatch loop: reads queue -> filters -> formats -> sends -> records.""" + while self._running: + try: + event = self._event_queue.get(timeout=2) + except Empty: + continue + + try: + self._process_event(event) + except Exception as e: + print(f"[NotificationManager] Dispatch error: {e}") + + def _process_event(self, event: NotificationEvent): + """Process a single event from the queue.""" + if not self._enabled: + return + + # Check if this event type is enabled in settings + event_setting = f'events.{event.event_type}' + if self._config.get(event_setting, 'true') == 'false': + return + + # Check severity filter + min_severity = self._config.get('filter.min_severity', 'INFO') + if not self._meets_severity(event.severity, min_severity): + return + + # Check cooldown + if not self._check_cooldown(event): + return + + # Render message from template + rendered = render_template(event.event_type, event.data) + + # Optional AI enhancement + ai_config = { + 'enabled': self._config.get('ai_enabled', 'false'), + 'provider': self._config.get('ai_provider', ''), + 'api_key': self._config.get('ai_api_key', ''), + 'model': self._config.get('ai_model', ''), + } + body = format_with_ai( + rendered['title'], rendered['body'], rendered['severity'], ai_config + ) + + # Send through all active channels + self._dispatch_to_channels( + rendered['title'], body, rendered['severity'], + event.event_type, event.data, event.source + ) + + def _dispatch_to_channels(self, title: str, body: str, severity: str, + event_type: str, data: Dict, source: str): + """Send notification through all configured channels.""" + with self._lock: + channels = dict(self._channels) + + for ch_name, channel in channels.items(): + try: + result = channel.send(title, body, severity, data) + self._record_history( + event_type, ch_name, title, body, severity, + result.get('success', False), + result.get('error', ''), + source + ) + + if result.get('success'): + self._stats['total_sent'] += 1 + self._stats['last_sent_at'] = datetime.now().isoformat() + else: + self._stats['total_errors'] += 1 + print(f"[NotificationManager] Send failed ({ch_name}): {result.get('error')}") + + except Exception as e: + self._stats['total_errors'] += 1 + self._record_history( + event_type, ch_name, title, body, severity, + False, str(e), source + ) + + # ─── Cooldown / Dedup ─────────────────────────────────────── + + def _check_cooldown(self, event: NotificationEvent) -> bool: + """Check if the event passes cooldown rules.""" + now = time.time() + + # Determine cooldown period + template = TEMPLATES.get(event.event_type, {}) + group = template.get('group', 'system') + + # Priority: per-type config > per-severity > default + cooldown_key = f'cooldown.{event.event_type}' + cooldown_str = self._config.get(cooldown_key) + + if cooldown_str is None: + cooldown_key_group = f'cooldown.{group}' + cooldown_str = self._config.get(cooldown_key_group) + + if cooldown_str is not None: + cooldown = int(cooldown_str) + else: + cooldown = DEFAULT_COOLDOWNS.get(event.severity, 300) + + # CRITICAL events have zero cooldown by default + if event.severity == 'CRITICAL' and cooldown_str is None: + cooldown = 0 + + # Check against last sent time + dedup_key = f"{event.event_type}:{event.data.get('category', '')}:{event.data.get('vmid', '')}" + last_sent = self._cooldowns.get(dedup_key, 0) + + if now - last_sent < cooldown: + return False + + self._cooldowns[dedup_key] = now + return True + + @staticmethod + def _meets_severity(event_severity: str, min_severity: str) -> bool: + """Check if event severity meets the minimum threshold.""" + levels = {'INFO': 0, 'WARNING': 1, 'CRITICAL': 2} + return levels.get(event_severity, 0) >= levels.get(min_severity, 0) + + # ─── History Recording ────────────────────────────────────── + + def _record_history(self, event_type: str, channel: str, title: str, + message: str, severity: str, success: bool, + error_message: str, source: str): + """Record a notification attempt in the history table.""" + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO notification_history + (event_type, channel, title, message, severity, sent_at, success, error_message, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + event_type, channel, title, message[:500], severity, + datetime.now().isoformat(), 1 if success else 0, + error_message[:500] if error_message else None, source + )) + conn.commit() + conn.close() + except Exception as e: + print(f"[NotificationManager] History record error: {e}") + + # ─── Public API (used by Flask routes and CLI) ────────────── + + def send_notification(self, event_type: str, severity: str, + title: str, message: str, + data: Optional[Dict] = None, + source: str = 'api') -> Dict[str, Any]: + """Send a notification directly (bypasses queue and cooldown). + + Used by CLI and API for explicit sends. + """ + if not self._channels: + self._load_config() + + if not self._channels: + return { + 'success': False, + 'error': 'No channels configured or enabled', + 'channels_sent': [], + } + + # Render template if available + if event_type in TEMPLATES and not message: + rendered = render_template(event_type, data or {}) + title = title or rendered['title'] + message = rendered['body'] + severity = severity or rendered['severity'] + + # AI enhancement + ai_config = { + 'enabled': self._config.get('ai_enabled', 'false'), + 'provider': self._config.get('ai_provider', ''), + 'api_key': self._config.get('ai_api_key', ''), + 'model': self._config.get('ai_model', ''), + } + message = format_with_ai(title, message, severity, ai_config) + + results = {} + channels_sent = [] + errors = [] + + with self._lock: + channels = dict(self._channels) + + for ch_name, channel in channels.items(): + try: + result = channel.send(title, message, severity, data) + results[ch_name] = result + + self._record_history( + event_type, ch_name, title, message, severity, + result.get('success', False), + result.get('error', ''), + source + ) + + if result.get('success'): + channels_sent.append(ch_name) + else: + errors.append(f"{ch_name}: {result.get('error')}") + except Exception as e: + errors.append(f"{ch_name}: {str(e)}") + + return { + 'success': len(channels_sent) > 0, + 'channels_sent': channels_sent, + 'errors': errors, + 'total_channels': len(channels), + } + + def send_raw(self, title: str, message: str, + severity: str = 'INFO', + source: str = 'api') -> Dict[str, Any]: + """Send a raw message without template (for custom scripts).""" + return self.send_notification( + 'custom', severity, title, message, source=source + ) + + def test_channel(self, channel_name: str = 'all') -> Dict[str, Any]: + """Test one or all configured channels.""" + if not self._channels: + self._load_config() + + if not self._channels: + return {'success': False, 'error': 'No channels configured'} + + results = {} + + if channel_name == 'all': + targets = dict(self._channels) + elif channel_name in self._channels: + targets = {channel_name: self._channels[channel_name]} + else: + # Try to create channel from config even if not enabled + ch_config = {} + for config_key in CHANNEL_TYPES.get(channel_name, {}).get('config_keys', []): + ch_config[config_key] = self._config.get(f'{channel_name}.{config_key}', '') + + channel = create_channel(channel_name, ch_config) + if channel: + targets = {channel_name: channel} + else: + return {'success': False, 'error': f'Channel {channel_name} not configured'} + + for ch_name, channel in targets.items(): + success, error = channel.test() + results[ch_name] = {'success': success, 'error': error} + + self._record_history( + 'test', ch_name, 'ProxMenux Test', + 'Test notification', 'INFO', + success, error, 'api' + ) + + overall_success = any(r['success'] for r in results.values()) + return { + 'success': overall_success, + 'results': results, + } + + def get_status(self) -> Dict[str, Any]: + """Get current service status.""" + if not self._config: + self._load_config() + + return { + 'enabled': self._enabled, + 'running': self._running, + 'channels': { + name: { + 'type': name, + 'connected': True, + } + for name in self._channels + }, + 'stats': self._stats, + 'watchers': { + 'journal': self._journal_watcher is not None and self._running, + 'task': self._task_watcher is not None and self._running, + 'polling': self._polling_collector is not None and self._running, + }, + } + + def set_enabled(self, enabled: bool) -> Dict[str, Any]: + """Enable or disable the notification service.""" + self._save_setting('enabled', 'true' if enabled else 'false') + self._enabled = enabled + + if enabled and not self._running: + self.start() + elif not enabled and self._running: + self.stop() + + return {'success': True, 'enabled': enabled} + + def list_channels(self) -> Dict[str, Any]: + """List all channel types with their configuration status.""" + if not self._config: + self._load_config() + + channels_info = {} + for ch_type, info in CHANNEL_TYPES.items(): + enabled = self._config.get(f'{ch_type}.enabled', 'false') == 'true' + configured = all( + bool(self._config.get(f'{ch_type}.{k}', '')) + for k in info['config_keys'] + ) + channels_info[ch_type] = { + 'name': info['name'], + 'enabled': enabled, + 'configured': configured, + 'active': ch_type in self._channels, + } + + return {'channels': channels_info} + + def get_history(self, limit: int = 50, offset: int = 0, + severity: str = '', channel: str = '') -> Dict[str, Any]: + """Get notification history with optional filters.""" + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + query = 'SELECT * FROM notification_history WHERE 1=1' + params: list = [] + + if severity: + query += ' AND severity = ?' + params.append(severity) + if channel: + query += ' AND channel = ?' + params.append(channel) + + query += ' ORDER BY sent_at DESC LIMIT ? OFFSET ?' + params.extend([limit, offset]) + + cursor.execute(query, params) + rows = [dict(row) for row in cursor.fetchall()] + + # Get total count + count_query = 'SELECT COUNT(*) FROM notification_history WHERE 1=1' + count_params: list = [] + if severity: + count_query += ' AND severity = ?' + count_params.append(severity) + if channel: + count_query += ' AND channel = ?' + count_params.append(channel) + + cursor.execute(count_query, count_params) + total = cursor.fetchone()[0] + + conn.close() + + return { + 'history': rows, + 'total': total, + 'limit': limit, + 'offset': offset, + } + except Exception as e: + return {'history': [], 'total': 0, 'error': str(e)} + + def clear_history(self) -> Dict[str, Any]: + """Clear all notification history.""" + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + conn.execute('DELETE FROM notification_history') + conn.commit() + conn.close() + return {'success': True} + except Exception as e: + return {'success': False, 'error': str(e)} + + def get_settings(self) -> Dict[str, Any]: + """Get all notification settings for the UI.""" + if not self._config: + self._load_config() + + return { + 'enabled': self._enabled, + 'settings': {f'{SETTINGS_PREFIX}{k}': v for k, v in self._config.items()}, + 'channels': self.list_channels()['channels'], + 'event_groups': EVENT_GROUPS, + 'event_types': get_event_types_by_group(), + 'default_events': get_default_enabled_events(), + } + + def save_settings(self, settings: Dict[str, str]) -> Dict[str, Any]: + """Save multiple notification settings at once.""" + try: + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + cursor = conn.cursor() + now = datetime.now().isoformat() + + for key, value in settings.items(): + # Accept both prefixed and unprefixed keys + full_key = key if key.startswith(SETTINGS_PREFIX) else f'{SETTINGS_PREFIX}{key}' + short_key = full_key[len(SETTINGS_PREFIX):] + + cursor.execute(''' + INSERT OR REPLACE INTO user_settings (setting_key, setting_value, updated_at) + VALUES (?, ?, ?) + ''', (full_key, str(value), now)) + + self._config[short_key] = str(value) + + conn.commit() + conn.close() + + # Rebuild channels with new config + self._enabled = self._config.get('enabled', 'false') == 'true' + self._rebuild_channels() + + return {'success': True, 'channels_active': list(self._channels.keys())} + except Exception as e: + return {'success': False, 'error': str(e)} + + +# ─── Singleton (for server mode) ───────────────────────────────── + +notification_manager = NotificationManager() + + +# ─── CLI Interface ──────────────────────────────────────────────── + +def _print_result(result: Dict, as_json: bool): + """Print CLI result in human-readable or JSON format.""" + if as_json: + print(json.dumps(result, indent=2, default=str)) + return + + if result.get('success'): + print(f"OK: ", end='') + elif 'success' in result and not result['success']: + print(f"ERROR: ", end='') + + # Format based on content + if 'channels_sent' in result: + sent = result.get('channels_sent', []) + print(f"Sent via: {', '.join(sent) if sent else 'none'}") + if result.get('errors'): + for err in result['errors']: + print(f" Error: {err}") + elif 'results' in result: + for ch, r in result['results'].items(): + status = 'OK' if r['success'] else f"FAILED: {r['error']}" + print(f" {ch}: {status}") + elif 'channels' in result: + for ch, info in result['channels'].items(): + status = 'active' if info.get('active') else ('configured' if info.get('configured') else 'not configured') + enabled = 'enabled' if info.get('enabled') else 'disabled' + print(f" {info['name']}: {enabled}, {status}") + elif 'enabled' in result and 'running' in result: + print(f"Enabled: {result['enabled']}, Running: {result['running']}") + if result.get('stats'): + stats = result['stats'] + print(f" Total sent: {stats.get('total_sent', 0)}") + print(f" Total errors: {stats.get('total_errors', 0)}") + if stats.get('last_sent_at'): + print(f" Last sent: {stats['last_sent_at']}") + elif 'enabled' in result: + print(f"Service {'enabled' if result['enabled'] else 'disabled'}") + else: + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser( + description='ProxMenux Notification Manager CLI', + epilog='Example: python3 notification_manager.py --action send --type vm_fail --severity CRITICAL --title "VM 100 failed" --message "QEMU process crashed"' + ) + parser.add_argument('--action', required=True, + choices=['send', 'send-raw', 'test', 'status', + 'enable', 'disable', 'list-channels'], + help='Action to perform') + parser.add_argument('--type', help='Event type for send action (e.g. vm_fail, backup_complete)') + parser.add_argument('--severity', default='INFO', + choices=['INFO', 'WARNING', 'CRITICAL'], + help='Notification severity (default: INFO)') + parser.add_argument('--title', help='Notification title') + parser.add_argument('--message', help='Notification message body') + parser.add_argument('--channel', default='all', + help='Specific channel for test (default: all)') + parser.add_argument('--json', action='store_true', + help='Output result as JSON') + + args = parser.parse_args() + + mgr = NotificationManager() + mgr._load_config() + + if args.action == 'send': + if not args.type: + parser.error('--type is required for send action') + result = mgr.send_notification( + args.type, args.severity, + args.title or '', args.message or '', + data={ + 'hostname': socket.gethostname().split('.')[0], + 'reason': args.message or '', + }, + source='cli' + ) + + elif args.action == 'send-raw': + if not args.title or not args.message: + parser.error('--title and --message are required for send-raw') + result = mgr.send_raw(args.title, args.message, args.severity, source='cli') + + elif args.action == 'test': + result = mgr.test_channel(args.channel) + + elif args.action == 'status': + result = mgr.get_status() + + elif args.action == 'enable': + result = mgr.set_enabled(True) + + elif args.action == 'disable': + result = mgr.set_enabled(False) + + elif args.action == 'list-channels': + result = mgr.list_channels() + + else: + result = {'error': f'Unknown action: {args.action}'} + + _print_result(result, args.json) + + # Exit with appropriate code + sys.exit(0 if result.get('success', True) else 1) diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py new file mode 100644 index 00000000..cf4ac832 --- /dev/null +++ b/AppImage/scripts/notification_templates.py @@ -0,0 +1,554 @@ +""" +ProxMenux Notification Templates +Message templates for all event types with per-channel formatting. + +Templates use Python str.format() variables: + {hostname}, {severity}, {category}, {reason}, {summary}, + {previous}, {current}, {vmid}, {vmname}, {timestamp}, etc. + +Optional AI enhancement enriches messages with context/suggestions. + +Author: MacRimi +""" + +import json +import socket +import time +import urllib.request +import urllib.error +from typing import Dict, Any, Optional + + +# ─── Severity Icons ────────────────────────────────────────────── + +SEVERITY_ICONS = { + 'CRITICAL': '\U0001F534', + 'WARNING': '\U0001F7E1', + 'INFO': '\U0001F535', + 'OK': '\U0001F7E2', + 'UNKNOWN': '\u26AA', +} + +SEVERITY_ICONS_DISCORD = { + 'CRITICAL': ':red_circle:', + 'WARNING': ':yellow_circle:', + 'INFO': ':blue_circle:', + 'OK': ':green_circle:', + 'UNKNOWN': ':white_circle:', +} + + +# ─── Event Templates ───────────────────────────────────────────── +# Each template has a 'title' and 'body' with {variable} placeholders. +# 'group' is used for UI event filter grouping. +# 'default_enabled' controls initial state in settings. + +TEMPLATES = { + # ── Health Monitor state changes ── + 'state_change': { + 'title': '{hostname}: {category} changed to {current}', + 'body': '{category} status changed from {previous} to {current}.\n{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'new_error': { + 'title': '{hostname}: New {severity} - {category}', + 'body': '{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'error_resolved': { + 'title': '{hostname}: Resolved - {category}', + 'body': '{reason}\nDuration: {duration}', + 'group': 'system', + 'default_enabled': True, + }, + 'error_escalated': { + 'title': '{hostname}: Escalated to {severity} - {category}', + 'body': '{reason}', + 'group': 'system', + 'default_enabled': True, + }, + + # ── VM / CT events ── + 'vm_start': { + 'title': '{hostname}: VM {vmid} started', + 'body': '{vmname} ({vmid}) has been started.', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'vm_stop': { + 'title': '{hostname}: VM {vmid} stopped', + 'body': '{vmname} ({vmid}) has been stopped.', + 'group': 'vm_ct', + 'default_enabled': False, + }, + 'vm_shutdown': { + 'title': '{hostname}: VM {vmid} shutdown', + 'body': '{vmname} ({vmid}) has been shut down.', + 'group': 'vm_ct', + 'default_enabled': False, + }, + 'vm_fail': { + 'title': '{hostname}: VM {vmid} FAILED', + 'body': '{vmname} ({vmid}) has failed.\n{reason}', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'vm_restart': { + 'title': '{hostname}: VM {vmid} restarted', + 'body': '{vmname} ({vmid}) has been restarted.', + 'group': 'vm_ct', + 'default_enabled': False, + }, + 'ct_start': { + 'title': '{hostname}: CT {vmid} started', + 'body': '{vmname} ({vmid}) has been started.', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'ct_stop': { + 'title': '{hostname}: CT {vmid} stopped', + 'body': '{vmname} ({vmid}) has been stopped.', + 'group': 'vm_ct', + 'default_enabled': False, + }, + 'ct_fail': { + 'title': '{hostname}: CT {vmid} FAILED', + 'body': '{vmname} ({vmid}) has failed.\n{reason}', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'migration_start': { + 'title': '{hostname}: Migration started - {vmid}', + 'body': '{vmname} ({vmid}) migration to {target_node} started.', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'migration_complete': { + 'title': '{hostname}: Migration complete - {vmid}', + 'body': '{vmname} ({vmid}) migrated successfully to {target_node}.', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'migration_fail': { + 'title': '{hostname}: Migration FAILED - {vmid}', + 'body': '{vmname} ({vmid}) migration to {target_node} failed.\n{reason}', + 'group': 'vm_ct', + 'default_enabled': True, + }, + + # ── Backup / Snapshot events ── + 'backup_start': { + 'title': '{hostname}: Backup started - {vmid}', + 'body': 'Backup of {vmname} ({vmid}) has started.', + 'group': 'backup', + 'default_enabled': False, + }, + 'backup_complete': { + 'title': '{hostname}: Backup complete - {vmid}', + 'body': 'Backup of {vmname} ({vmid}) completed successfully.\nSize: {size}', + 'group': 'backup', + 'default_enabled': True, + }, + 'backup_fail': { + 'title': '{hostname}: Backup FAILED - {vmid}', + 'body': 'Backup of {vmname} ({vmid}) has failed.\n{reason}', + 'group': 'backup', + 'default_enabled': True, + }, + 'snapshot_complete': { + 'title': '{hostname}: Snapshot created - {vmid}', + 'body': 'Snapshot of {vmname} ({vmid}) created: {snapshot_name}', + 'group': 'backup', + 'default_enabled': False, + }, + 'snapshot_fail': { + 'title': '{hostname}: Snapshot FAILED - {vmid}', + 'body': 'Snapshot of {vmname} ({vmid}) failed.\n{reason}', + 'group': 'backup', + 'default_enabled': True, + }, + + # ── Resource events (from Health Monitor) ── + 'cpu_high': { + 'title': '{hostname}: High CPU usage ({value}%)', + 'body': 'CPU usage is at {value}% on {cores} cores.\n{details}', + 'group': 'resources', + 'default_enabled': True, + }, + 'ram_high': { + 'title': '{hostname}: High memory usage ({value}%)', + 'body': 'Memory usage: {used} / {total} ({value}%).\n{details}', + 'group': 'resources', + 'default_enabled': True, + }, + 'temp_high': { + 'title': '{hostname}: High temperature ({value}C)', + 'body': 'CPU temperature: {value}C (threshold: {threshold}C).\n{details}', + 'group': 'resources', + 'default_enabled': True, + }, + 'disk_space_low': { + 'title': '{hostname}: Low disk space on {mount}', + 'body': '{mount}: {used}% used ({available} available).', + 'group': 'storage', + 'default_enabled': True, + }, + 'disk_io_error': { + 'title': '{hostname}: Disk I/O error', + 'body': 'I/O error detected on {device}.\n{reason}', + 'group': 'storage', + 'default_enabled': True, + }, + 'load_high': { + 'title': '{hostname}: High system load ({value})', + 'body': 'System load average: {value} on {cores} cores.\n{details}', + 'group': 'resources', + 'default_enabled': True, + }, + + # ── Network events ── + 'network_down': { + 'title': '{hostname}: Network connectivity lost', + 'body': 'Network connectivity check failed.\n{reason}', + 'group': 'network', + 'default_enabled': True, + }, + 'network_latency': { + 'title': '{hostname}: High network latency ({value}ms)', + 'body': 'Latency to gateway: {value}ms (threshold: {threshold}ms).', + 'group': 'network', + 'default_enabled': False, + }, + + # ── Security events ── + 'auth_fail': { + 'title': '{hostname}: Authentication failure', + 'body': 'Failed login attempt from {source_ip}.\nUser: {username}\nService: {service}', + 'group': 'security', + 'default_enabled': True, + }, + 'ip_block': { + 'title': '{hostname}: IP blocked by Fail2Ban', + 'body': 'IP {source_ip} has been banned.\nJail: {jail}\nFailures: {failures}', + 'group': 'security', + 'default_enabled': True, + }, + 'firewall_issue': { + 'title': '{hostname}: Firewall issue detected', + 'body': '{reason}', + 'group': 'security', + 'default_enabled': True, + }, + 'user_permission_change': { + 'title': '{hostname}: User permission changed', + 'body': 'User: {username}\nChange: {change_details}', + 'group': 'security', + 'default_enabled': True, + }, + + # ── Cluster events ── + 'split_brain': { + 'title': '{hostname}: SPLIT-BRAIN detected', + 'body': 'Cluster split-brain condition detected.\nQuorum status: {quorum}', + 'group': 'cluster', + 'default_enabled': True, + }, + 'node_disconnect': { + 'title': '{hostname}: Node disconnected', + 'body': 'Node {node_name} has disconnected from the cluster.', + 'group': 'cluster', + 'default_enabled': True, + }, + 'node_reconnect': { + 'title': '{hostname}: Node reconnected', + 'body': 'Node {node_name} has reconnected to the cluster.', + 'group': 'cluster', + 'default_enabled': True, + }, + + # ── System events ── + 'system_shutdown': { + 'title': '{hostname}: System shutting down', + 'body': 'The system is shutting down.\n{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'system_reboot': { + 'title': '{hostname}: System rebooting', + 'body': 'The system is rebooting.\n{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'system_problem': { + 'title': '{hostname}: System problem detected', + 'body': '{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'service_fail': { + 'title': '{hostname}: Service failed - {service_name}', + 'body': 'Service {service_name} has failed.\n{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'update_available': { + 'title': '{hostname}: Updates available ({count})', + 'body': '{count} package updates are available.\n{details}', + 'group': 'system', + 'default_enabled': False, + }, + 'update_complete': { + 'title': '{hostname}: Update completed', + 'body': '{details}', + 'group': 'system', + 'default_enabled': False, + }, + + # ── Unknown persistent (from health monitor) ── + 'unknown_persistent': { + 'title': '{hostname}: Check unavailable - {category}', + 'body': 'Health check for {category} has been unavailable for 3+ cycles.\n{reason}', + 'group': 'system', + 'default_enabled': False, + }, +} + +# ─── Event Groups (for UI filtering) ───────────────────────────── + +EVENT_GROUPS = { + 'system': {'label': 'System', 'description': 'System health, services, updates'}, + 'vm_ct': {'label': 'VM / CT', 'description': 'Virtual machines and containers'}, + 'backup': {'label': 'Backup', 'description': 'Backups and snapshots'}, + 'resources': {'label': 'Resources', 'description': 'CPU, memory, temperature, load'}, + 'storage': {'label': 'Storage', 'description': 'Disk space and I/O'}, + 'network': {'label': 'Network', 'description': 'Connectivity and latency'}, + 'security': {'label': 'Security', 'description': 'Authentication, firewall, bans'}, + 'cluster': {'label': 'Cluster', 'description': 'Cluster health and quorum'}, +} + + +# ─── Template Renderer ─────────────────────────────────────────── + +def _get_hostname() -> str: + """Get short hostname for message titles.""" + try: + return socket.gethostname().split('.')[0] + except Exception: + return 'proxmox' + + +def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, str]: + """Render a template with the given data. + + Args: + event_type: Key from TEMPLATES dict + data: Variables to fill into the template + + Returns: + {'title': rendered_title, 'body': rendered_body, 'severity': severity} + """ + template = TEMPLATES.get(event_type) + if not template: + # Fallback for unknown event types + return { + 'title': f"{_get_hostname()}: {event_type}", + 'body': data.get('message', data.get('reason', str(data))), + 'severity': data.get('severity', 'INFO'), + } + + # Ensure hostname is always available + variables = { + 'hostname': _get_hostname(), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + 'severity': data.get('severity', 'INFO'), + # Common defaults + 'vmid': '', + 'vmname': '', + 'reason': '', + 'summary': '', + 'details': '', + 'category': '', + 'previous': '', + 'current': '', + 'duration': '', + 'value': '', + 'threshold': '', + 'source_ip': '', + 'username': '', + 'service': '', + 'service_name': '', + 'node_name': '', + 'target_node': '', + 'mount': '', + 'device': '', + 'used': '', + 'total': '', + 'available': '', + 'cores': '', + 'count': '', + 'size': '', + 'snapshot_name': '', + 'jail': '', + 'failures': '', + 'quorum': '', + 'change_details': '', + 'message': '', + } + variables.update(data) + + try: + title = template['title'].format(**variables) + except (KeyError, ValueError): + title = template['title'] # Use raw template if formatting fails + + try: + body = template['body'].format(**variables) + except (KeyError, ValueError): + body = template['body'] + + # Clean up empty lines from missing optional variables + body = '\n'.join(line for line in body.split('\n') if line.strip()) + + return { + 'title': title, + 'body': body, + 'severity': variables.get('severity', 'INFO'), + } + + +def get_event_types_by_group() -> Dict[str, list]: + """Get all event types organized by group, for UI rendering. + + Returns: + {group_key: [{'type': event_type, 'title': template_title, + 'default_enabled': bool}, ...]} + """ + result = {} + for event_type, template in TEMPLATES.items(): + group = template.get('group', 'system') + if group not in result: + result[group] = [] + result[group].append({ + 'type': event_type, + 'title': template['title'].replace('{hostname}', '').strip(': '), + 'default_enabled': template.get('default_enabled', True), + }) + return result + + +def get_default_enabled_events() -> Dict[str, bool]: + """Get the default enabled state for all event types.""" + return { + event_type: template.get('default_enabled', True) + for event_type, template in TEMPLATES.items() + } + + +# ─── AI Enhancement (Optional) ─────────────────────────────────── + +class AIEnhancer: + """Optional AI message enhancement using external LLM API. + + Enriches template-generated messages with context and suggestions. + Falls back to original message if AI is unavailable or fails. + """ + + SYSTEM_PROMPT = """You are a Proxmox system administrator assistant. +You receive a notification message about a server event and must enhance it with: +1. A brief explanation of what this means in practical terms +2. A suggested action if applicable (1-2 sentences max) + +Keep the response concise (max 3 sentences total). Do not repeat the original message. +Respond in the same language as the input message.""" + + def __init__(self, provider: str, api_key: str, model: str = ''): + self.provider = provider.lower() + self.api_key = api_key + self.model = model + self._enabled = bool(api_key) + + @property + def enabled(self) -> bool: + return self._enabled + + def enhance(self, title: str, body: str, severity: str) -> Optional[str]: + """Enhance a notification message with AI context. + + Returns enhanced body text, or None if enhancement fails/disabled. + """ + if not self._enabled: + return None + + try: + if self.provider in ('openai', 'groq'): + return self._call_openai_compatible(title, body, severity) + except Exception as e: + print(f"[AIEnhancer] Enhancement failed: {e}") + + return None + + def _call_openai_compatible(self, title: str, body: str, severity: str) -> Optional[str]: + """Call OpenAI-compatible API (works with OpenAI, Groq, local).""" + if self.provider == 'groq': + url = 'https://api.groq.com/openai/v1/chat/completions' + model = self.model or 'llama-3.3-70b-versatile' + else: # openai + url = 'https://api.openai.com/v1/chat/completions' + model = self.model or 'gpt-4o-mini' + + user_msg = f"Severity: {severity}\nTitle: {title}\nMessage: {body}" + + payload = json.dumps({ + 'model': model, + 'messages': [ + {'role': 'system', 'content': self.SYSTEM_PROMPT}, + {'role': 'user', 'content': user_msg}, + ], + 'max_tokens': 150, + 'temperature': 0.3, + }).encode('utf-8') + + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + } + + req = urllib.request.Request(url, data=payload, headers=headers) + with urllib.request.urlopen(req, timeout=10) as resp: + result = json.loads(resp.read().decode('utf-8')) + content = result['choices'][0]['message']['content'].strip() + return content if content else None + + +def format_with_ai(title: str, body: str, severity: str, + ai_config: Dict[str, str]) -> str: + """Format a message with optional AI enhancement. + + If AI is configured and succeeds, appends AI insight to the body. + Otherwise returns the original body unchanged. + + Args: + title: Notification title + body: Notification body + severity: Severity level + ai_config: {'enabled': 'true', 'provider': 'groq', 'api_key': '...', 'model': ''} + + Returns: + Enhanced body string + """ + if ai_config.get('enabled') != 'true' or not ai_config.get('api_key'): + return body + + enhancer = AIEnhancer( + provider=ai_config.get('provider', 'groq'), + api_key=ai_config['api_key'], + model=ai_config.get('model', ''), + ) + + insight = enhancer.enhance(title, body, severity) + if insight: + return f"{body}\n\n---\n{insight}" + + return body