From de13eb5b96fe631b8b6ef0eea29aece7d38580e4 Mon Sep 17 00:00:00 2001 From: MacRimi Date: Sat, 21 Feb 2026 17:23:03 +0100 Subject: [PATCH] Update notification service --- AppImage/scripts/notification_events.py | 363 +++++++++++++++------ AppImage/scripts/notification_manager.py | 15 +- AppImage/scripts/notification_templates.py | 54 ++- 3 files changed, 333 insertions(+), 99 deletions(-) diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 6b92d2cb..0b30978a 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -18,6 +18,7 @@ import json import time import hashlib import socket +import sqlite3 import subprocess import threading from queue import Queue @@ -614,27 +615,68 @@ class TaskWatcher: # ─── Polling Collector ──────────────────────────────────────────── class PollingCollector: - """Periodic collector that reads Health Monitor pending notifications. + """Periodic collector that polls health state independently. - Polls health_persistence for: - - Pending notification events (state changes from Bloque A) - - Unnotified errors - - Update availability (every 24h) + Architecture: + - Completely independent from Health Monitor's suppression system. + Suppression Duration only affects the UI health badge; it NEVER blocks + notifications. + - Reads ``get_active_errors()`` (ALL active errors, even suppressed ones) + and decides when to notify based on its own 24-hour cycle. + - For *new* errors (first_seen within the last poll interval), notifies + immediately. + - For *persistent* errors (already known), re-notifies once every 24 h. + - Update checks run on their own 24-h timer and include security counts. + + Tracking is stored in ``notification_last_sent`` (same DB). """ - def __init__(self, event_queue: Queue, poll_interval: int = 30): + DIGEST_INTERVAL = 86400 # 24 h between re-notifications + UPDATE_CHECK_INTERVAL = 86400 # 24 h between update scans + NEW_ERROR_WINDOW = 120 # seconds – errors younger than this are "new" + + _ENTITY_MAP = { + 'cpu': ('node', ''), 'memory': ('node', ''), 'temperature': ('node', ''), + 'disk': ('storage', ''), 'network': ('network', ''), + 'pve_services': ('node', ''), 'security': ('user', ''), + 'updates': ('node', ''), 'storage': ('storage', ''), + } + + # Map health-persistence category names to our TEMPLATES event types. + # These must match keys in notification_templates.TEMPLATES exactly. + _CATEGORY_TO_EVENT_TYPE = { + 'cpu': 'cpu_high', + 'memory': 'ram_high', + 'load': 'load_high', + 'temperature': 'temp_high', + 'disk': 'disk_space_low', + 'storage': 'disk_space_low', + 'network': 'network_down', + 'pve_services': 'service_fail', + 'security': 'auth_fail', + 'updates': 'update_available', + 'zfs': 'disk_io_error', + 'smart': 'disk_io_error', + } + + def __init__(self, event_queue: Queue, poll_interval: int = 60): self._queue = event_queue self._running = False self._thread: Optional[threading.Thread] = None self._poll_interval = poll_interval self._hostname = _hostname() self._last_update_check = 0 - self._update_check_interval = 86400 # 24 hours + # In-memory cache: error_key -> last notification timestamp + self._last_notified: Dict[str, float] = {} + # Track known error keys so we can detect truly new ones + self._known_errors: set = set() + self._first_poll_done = False def start(self): if self._running: return self._running = True + self._load_last_notified() self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='polling-collector') self._thread.start() @@ -642,92 +684,121 @@ class PollingCollector: def stop(self): self._running = False + # ── Main loop ────────────────────────────────────────────── + def _poll_loop(self): """Main polling loop.""" # Initial delay to let health monitor warm up - for _ in range(10): + for _ in range(15): if not self._running: return time.sleep(1) while self._running: try: - self._collect_health_events() + self._check_persistent_health() self._check_updates() except Exception as e: print(f"[PollingCollector] Error: {e}") - # Sleep in small increments for responsive shutdown for _ in range(self._poll_interval): if not self._running: return time.sleep(1) - def _collect_health_events(self): - """Collect pending notification events from health_persistence.""" + # ── Health errors (independent of suppression) ───────────── + + def _check_persistent_health(self): + """Read ALL active errors from health_persistence and decide + whether each one warrants a notification right now. + + Rules: + - A *new* error (not in _known_errors) -> notify immediately + - A *persistent* error already notified -> re-notify after 24 h + - Uses its own tracking, NOT the health monitor's needs_notification flag + """ try: from health_persistence import health_persistence - - # Get pending notification events - events = health_persistence.get_pending_notifications() - for evt in events: - data = json.loads(evt.get('data', '{}')) if isinstance(evt.get('data'), str) else evt.get('data', {}) - - event_type = evt.get('event_type', 'state_change') - severity = data.get('severity', 'WARNING') - - data['hostname'] = self._hostname - data['error_key'] = evt.get('error_key', '') - - # Deduce entity from health category - category = data.get('category', '') - entity_map = { - 'cpu': ('node', ''), 'memory': ('node', ''), - 'disk': ('storage', ''), 'network': ('network', ''), - 'pve_services': ('node', ''), 'security': ('user', ''), - 'updates': ('node', ''), 'storage': ('storage', ''), - } - entity, eid = entity_map.get(category, ('node', '')) - self._queue.put(NotificationEvent( - event_type, severity, data, source='health', - entity=entity, entity_id=eid or data.get('error_key', ''), - )) - - # Mark events as notified - if events: - event_ids = [e['id'] for e in events if 'id' in e] - if event_ids: - health_persistence.mark_events_notified(event_ids) - - # Also check unnotified errors - unnotified = health_persistence.get_unnotified_errors() - for error in unnotified: - err_cat = error.get('category', '') - e_entity, e_eid = entity_map.get(err_cat, ('node', '')) - self._queue.put(NotificationEvent( - 'new_error', error.get('severity', 'WARNING'), { - 'category': err_cat, - 'reason': error.get('reason', ''), - 'hostname': self._hostname, - 'error_key': error.get('error_key', ''), - }, - source='health', - entity=e_entity, - entity_id=e_eid or error.get('error_key', ''), - )) - # Mark as notified - if 'id' in error: - health_persistence.mark_notified(error['id']) - + errors = health_persistence.get_active_errors() except ImportError: - pass # health_persistence not available (CLI mode) + return except Exception as e: - print(f"[PollingCollector] Health event collection error: {e}") + print(f"[PollingCollector] get_active_errors failed: {e}") + return + + now = time.time() + current_keys = set() + + for error in errors: + error_key = error.get('error_key', '') + if not error_key: + continue + + current_keys.add(error_key) + category = error.get('category', '') + severity = error.get('severity', 'WARNING') + reason = error.get('reason', '') + + # Determine if we should notify + is_new = error_key not in self._known_errors and self._first_poll_done + last_sent = self._last_notified.get(error_key, 0) + is_due = (now - last_sent) >= self.DIGEST_INTERVAL + + if not is_new and not is_due: + continue + + # Map to our event type + event_type = self._CATEGORY_TO_EVENT_TYPE.get(category, 'system_problem') + entity, eid = self._ENTITY_MAP.get(category, ('node', '')) + + data = { + 'hostname': self._hostname, + 'category': category, + 'reason': reason, + 'error_key': error_key, + 'severity': severity, + 'first_seen': error.get('first_seen', ''), + 'last_seen': error.get('last_seen', ''), + 'is_persistent': not is_new, + } + + # Include extra details if present + details = error.get('details') + if isinstance(details, dict): + data.update(details) + elif isinstance(details, str): + try: + data.update(json.loads(details)) + except (json.JSONDecodeError, TypeError): + pass + + self._queue.put(NotificationEvent( + event_type, severity, data, source='health', + entity=entity, entity_id=eid or error_key, + )) + + # Track that we notified + self._last_notified[error_key] = now + self._persist_last_notified(error_key, now) + + # Remove tracking for errors that resolved + resolved = self._known_errors - current_keys + for key in resolved: + self._last_notified.pop(key, None) + + self._known_errors = current_keys + self._first_poll_done = True + + # ── Update check (enriched) ──────────────────────────────── def _check_updates(self): - """Check for available system updates (every 24h).""" + """Check for available system updates every 24 h. + + Enriched output: total count, security updates, PVE version hint, + and top package names. + """ now = time.time() - if now - self._last_update_check < self._update_check_interval: + if now - self._last_update_check < self.UPDATE_CHECK_INTERVAL: return self._last_update_check = now @@ -735,33 +806,107 @@ class PollingCollector: try: result = subprocess.run( ['apt-get', '-s', 'upgrade'], - capture_output=True, text=True, timeout=60 + capture_output=True, text=True, timeout=60, ) + if result.returncode != 0: + return - if result.returncode == 0: - # Count upgradeable packages - lines = [l for l in result.stdout.split('\n') - if l.startswith('Inst ')] - count = len(lines) - - if count > 0: - # Show first 5 package names - packages = [l.split()[1] for l in lines[:5]] - details = ', '.join(packages) - if count > 5: - details += f', ... and {count - 5} more' - - self._queue.put(NotificationEvent( - 'update_available', 'INFO', { - 'count': str(count), - 'details': details, - 'hostname': self._hostname, - }, - source='polling', - entity='node', entity_id='', - )) + lines = [l for l in result.stdout.split('\n') if l.startswith('Inst ')] + total = len(lines) + if total == 0: + return + + packages = [l.split()[1] for l in lines] + security = [p for p in packages if any( + kw in p.lower() for kw in ('security', 'cve', 'openssl', 'libssl') + )] + + # Also detect security updates via apt changelog / Debian-Security origin + sec_result = subprocess.run( + ['apt-get', '-s', 'upgrade', '-o', 'Dir::Etc::SourceList=/dev/null', + '-o', 'Dir::Etc::SourceParts=/dev/null'], + capture_output=True, text=True, timeout=30, + ) + # Count lines from security repo (rough heuristic) + sec_count = max(len(security), 0) + try: + sec_output = subprocess.run( + ['apt-get', '-s', '--only-upgrade', 'install'] + packages[:50], + capture_output=True, text=True, timeout=30, + ) + for line in sec_output.stdout.split('\n'): + if 'security' in line.lower() and 'Inst ' in line: + sec_count += 1 + except Exception: + pass + + # Check for PVE version upgrade + pve_packages = [p for p in packages if 'pve-' in p.lower() or 'proxmox-' in p.lower()] + + # Build display details + top_pkgs = packages[:8] + details = ', '.join(top_pkgs) + if total > 8: + details += f', ... +{total - 8} more' + + data = { + 'hostname': self._hostname, + 'count': str(total), + 'security_count': str(sec_count), + 'details': details, + 'packages': ', '.join(packages[:20]), + } + if pve_packages: + data['pve_packages'] = ', '.join(pve_packages) + + self._queue.put(NotificationEvent( + 'update_available', 'INFO', data, + source='polling', entity='node', entity_id='', + )) except Exception: - pass # Non-critical, silently skip + pass + + # ── Persistence helpers ──────────────────────────────────── + + def _load_last_notified(self): + """Load per-error notification timestamps from DB on startup.""" + try: + db_path = Path('/usr/local/share/proxmenux/health_monitor.db') + if not db_path.exists(): + return + conn = sqlite3.connect(str(db_path), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + cursor = conn.cursor() + cursor.execute( + "SELECT fingerprint, last_sent_ts FROM notification_last_sent " + "WHERE fingerprint LIKE 'health_%'" + ) + for fp, ts in cursor.fetchall(): + error_key = fp.replace('health_', '', 1) + self._last_notified[error_key] = ts + self._known_errors.add(error_key) + conn.close() + except Exception as e: + print(f"[PollingCollector] Failed to load last_notified: {e}") + + def _persist_last_notified(self, error_key: str, ts: float): + """Save per-error notification timestamp to DB.""" + try: + db_path = Path('/usr/local/share/proxmenux/health_monitor.db') + conn = sqlite3.connect(str(db_path), timeout=10) + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA busy_timeout=5000') + fp = f'health_{error_key}' + conn.execute(''' + INSERT OR REPLACE INTO notification_last_sent (fingerprint, last_sent_ts, count) + VALUES (?, ?, COALESCE( + (SELECT count + 1 FROM notification_last_sent WHERE fingerprint = ?), 1 + )) + ''', (fp, int(ts), fp)) + conn.commit() + conn.close() + except Exception: + pass # ─── Proxmox Webhook Receiver ─────────────────────────────────── @@ -801,6 +946,11 @@ class ProxmoxHookWatcher: event_type, entity, entity_id = self._classify( notification_type, source_component, title, body, payload ) + + # Discard meta-events (overall status changes, update status, etc.) + if event_type == '_skip': + return {'accepted': False, 'skipped': True, 'reason': 'Meta-event filtered'} + severity = self._map_severity(severity_raw) data = { @@ -830,11 +980,28 @@ class ProxmoxHookWatcher: def _classify(self, ntype: str, component: str, title: str, body: str, payload: dict) -> tuple: - """Classify webhook payload into (event_type, entity, entity_id).""" + """Classify webhook payload into (event_type, entity, entity_id). + + Returns ('_skip', '', '') for meta-events we should discard. + """ title_lower = (title or '').lower() body_lower = (body or '').lower() component_lower = (component or '').lower() + # ── Skip PVE meta-events ── + # PVE sends "overall status changed from OK to WARNING" which is a meta + # aggregation event. Our own health monitor handles the underlying issues + # with better granularity, so we skip these to avoid noise/duplicates. + if 'overall' in title_lower and ('changed' in title_lower or 'status' in title_lower): + return '_skip', '', '' + + # ── Skip "updates changed" status events ── + # PVE sends "updates status changed from OK to WARNING" when apt updates + # are available. Our PollingCollector already handles update checks with + # proper detail (security count, package list) on a 24h cycle. + if 'updates' in title_lower and ('changed' in title_lower or 'status' in title_lower): + return '_skip', '', '' + # VM / CT lifecycle events (if sent via webhook) vmid = str(payload.get('vmid', '')) if any(k in component_lower for k in ('qemu', 'lxc', 'vm', 'ct', 'container')): @@ -872,8 +1039,8 @@ class ProxmoxHookWatcher: if 'replication' in component_lower or 'replication' in title_lower: vmid = str(payload.get('vmid', '')) if 'fail' in title_lower or 'error' in body_lower: - return 'vm_fail', 'vm', vmid - return 'migration_complete', 'vm', vmid + return 'replication_fail', 'vm', vmid + return 'replication_complete', 'vm', vmid # PBS (Proxmox Backup Server) if 'pbs' in component_lower or 'backup' in component_lower: @@ -901,8 +1068,10 @@ class ProxmoxHookWatcher: if 'network' in component_lower: return 'network_down', 'network', '' - # Security - if any(k in component_lower for k in ('auth', 'firewall', 'security')): + # Security -- distinguish firewall from auth + if 'firewall' in component_lower or 'firewall' in title_lower: + return 'firewall_issue', 'node', '' + if any(k in component_lower for k in ('auth', 'security', 'pam', 'sshd')): return 'auth_fail', 'user', '' # Fallback: system_problem generic diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index b013cbdf..11669a80 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -495,10 +495,23 @@ class NotificationManager: self._dispatch_event(event) def _process_event_direct(self, event: NotificationEvent): - """Process a burst summary event. Bypasses aggregator but applies all other filters.""" + """Process a burst summary event. Bypasses aggregator but applies ALL other filters.""" if not self._enabled: return + # Check group filter (same as _process_event) + template = TEMPLATES.get(event.event_type, {}) + event_group = template.get('group', 'system') + group_setting = f'events.{event_group}' + if self._config.get(group_setting, 'true') == 'false': + return + + # Check per-event filter (same as _process_event) + default_enabled = 'true' if template.get('default_enabled', True) else 'false' + event_specific = f'event.{event.event_type}' + if self._config.get(event_specific, default_enabled) == 'false': + return + # Check severity filter (same mapping as _process_event) severity_map = {'all': 'INFO', 'warning': 'WARNING', 'critical': 'CRITICAL'} raw_filter = self._config.get('severity_filter', 'all') diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index eafa358f..1bef87ca 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -45,11 +45,14 @@ SEVERITY_ICONS_DISCORD = { TEMPLATES = { # ── Health Monitor state changes ── + # NOTE: state_change is disabled by default -- it fires on every + # status oscillation (OK->WARNING->OK) which creates noise. + # The health_persistent and new_error templates cover this better. 'state_change': { 'title': '{hostname}: {category} changed to {current}', 'body': '{category} status changed from {previous} to {current}.\n{reason}', 'group': 'system', - 'default_enabled': True, + 'default_enabled': False, }, 'new_error': { 'title': '{hostname}: New {severity} - {category}', @@ -137,6 +140,18 @@ TEMPLATES = { 'group': 'vm_ct', 'default_enabled': True, }, + 'replication_fail': { + 'title': '{hostname}: Replication FAILED - {vmid}', + 'body': 'Replication of {vmname} ({vmid}) has failed.\n{reason}', + 'group': 'vm_ct', + 'default_enabled': True, + }, + 'replication_complete': { + 'title': '{hostname}: Replication complete - {vmid}', + 'body': 'Replication of {vmname} ({vmid}) completed successfully.', + 'group': 'vm_ct', + 'default_enabled': False, + }, # ── Backup / Snapshot events ── 'backup_start': { @@ -314,6 +329,40 @@ TEMPLATES = { 'default_enabled': False, }, + # ── Persistent Health Issues (daily digest) ── + 'health_persistent': { + 'title': '{hostname}: {count} active health issue(s)', + 'body': 'The following health issues remain active:\n{issue_list}\n\nThis digest is sent once every 24 hours while issues persist.', + 'group': 'system', + 'default_enabled': True, + }, + 'health_issue_new': { + 'title': '{hostname}: New health issue - {category}', + 'body': 'New {severity} issue detected:\n{reason}', + 'group': 'system', + 'default_enabled': True, + }, + 'health_issue_resolved': { + 'title': '{hostname}: Resolved - {category}', + 'body': '{category} issue has been resolved.\n{reason}\nDuration: {duration}', + 'group': 'system', + 'default_enabled': True, + }, + + # ── Update notifications (enriched) ── + 'update_summary': { + 'title': '{hostname}: {total_count} updates available', + 'body': '{security_count} security update(s), {total_count} total.\n{package_list}', + 'group': 'system', + 'default_enabled': True, + }, + 'pve_update': { + 'title': '{hostname}: PVE update available ({version})', + 'body': 'Proxmox VE update available: {version}\n{details}', + 'group': 'system', + 'default_enabled': True, + }, + # ── Burst aggregation summaries ── 'burst_auth_fail': { 'title': '{hostname}: {count} auth failures in {window}', @@ -407,6 +456,9 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]: 'used': '', 'total': '', 'available': '', 'cores': '', 'count': '', 'size': '', 'snapshot_name': '', 'jail': '', 'failures': '', 'quorum': '', 'change_details': '', 'message': '', + 'security_count': '0', 'total_count': '0', 'package_list': '', + 'packages': '', 'pve_packages': '', 'version': '', + 'issue_list': '', 'error_key': '', } variables.update(data)