From 2241b125d6bd5e2df17ef6e54e1a5729ccc37155 Mon Sep 17 00:00:00 2001 From: MacRimi Date: Wed, 25 Mar 2026 18:28:54 +0100 Subject: [PATCH] Update notification service --- AppImage/scripts/notification_events.py | 167 ++++++++++++++++++++- AppImage/scripts/notification_templates.py | 8 + 2 files changed, 173 insertions(+), 2 deletions(-) diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 3c6f7c70..a68a1c5a 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -26,6 +26,69 @@ from typing import Optional, Dict, Any, Tuple from pathlib import Path +# ─── Shared State for Cross-Watcher Coordination ────────────────── + +class _SharedState: + """Module-level state shared between all watchers. + + Used to coordinate behavior when host-level events affect VM/CT events: + - Suppress vm_stop/ct_stop during host shutdown (they're expected) + - Aggregate vm_start/ct_start during startup into single message + """ + def __init__(self): + self._lock = threading.Lock() + self._shutdown_time: float = 0 # timestamp when shutdown was detected + self._shutdown_grace = 120 # suppress VM/CT stops for 2 minutes after shutdown detected + self._startup_time: float = time.time() # when module was loaded (service start) + self._startup_grace = 300 # aggregate VM/CT starts for 5 minutes after startup + self._startup_vms: list = [] # [(vmid, vmname, 'vm'|'ct'), ...] + self._startup_aggregated = False # have we already sent the aggregated message? + + def mark_shutdown(self): + """Called when system_shutdown or system_reboot is detected.""" + with self._lock: + self._shutdown_time = time.time() + + def is_host_shutting_down(self) -> bool: + """Check if we're within the shutdown grace period.""" + with self._lock: + if self._shutdown_time == 0: + return False + return (time.time() - self._shutdown_time) < self._shutdown_grace + + def is_startup_period(self) -> bool: + """Check if we're within the startup aggregation period.""" + with self._lock: + return (time.time() - self._startup_time) < self._startup_grace + + def add_startup_vm(self, vmid: str, vmname: str, vm_type: str): + """Record a VM/CT start during startup period for later aggregation.""" + with self._lock: + self._startup_vms.append((vmid, vmname, vm_type)) + + def get_and_clear_startup_vms(self) -> list: + """Get all recorded startup VMs and clear the list.""" + with self._lock: + vms = self._startup_vms.copy() + self._startup_vms = [] + self._startup_aggregated = True + return vms + + def has_startup_vms(self) -> bool: + """Check if there are any startup VMs recorded.""" + with self._lock: + return len(self._startup_vms) > 0 + + def was_startup_aggregated(self) -> bool: + """Check if startup aggregation already happened.""" + with self._lock: + return self._startup_aggregated + + +# Global shared state instance +_shared_state = _SharedState() + + # ─── Event Object ───────────────────────────────────────────────── class NotificationEvent: @@ -1173,11 +1236,15 @@ class JournalWatcher: break if is_reboot: + # Mark shutdown state to suppress VM/CT stop events + _shared_state.mark_shutdown() self._emit('system_reboot', 'INFO', { 'reason': 'The system is rebooting.', 'hostname': self._hostname, }, entity='node', entity_id='') elif is_shutdown: + # Mark shutdown state to suppress VM/CT stop events + _shared_state.mark_shutdown() self._emit('system_shutdown', 'INFO', { 'reason': 'The system is shutting down.', 'hostname': self._hostname, @@ -1546,6 +1613,24 @@ class TaskWatcher: if self._is_vzdump_active(): return + # Suppress VM/CT stop/shutdown during host shutdown/reboot. + # When the host shuts down, all VMs/CTs stop - that's expected behavior, + # not something that needs individual notifications. + _SHUTDOWN_NOISE = {'vm_stop', 'vm_shutdown', 'ct_stop', 'ct_shutdown'} + if event_type in _SHUTDOWN_NOISE and not is_error: + if _shared_state.is_host_shutting_down(): + return + + # During startup period, aggregate VM/CT starts into a single message. + # Instead of N individual "VM X started" messages, collect them and + # let PollingCollector emit one "System startup: X VMs, Y CTs started". + _STARTUP_EVENTS = {'vm_start', 'ct_start'} + if event_type in _STARTUP_EVENTS and not is_error: + if _shared_state.is_startup_period(): + vm_type = 'ct' if event_type == 'ct_start' else 'vm' + _shared_state.add_startup_vm(vmid, vmname or f'ID {vmid}', vm_type) + return + self._queue.put(NotificationEvent( event_type, severity, data, source='tasks', entity=entity, entity_id=vmid, @@ -1684,6 +1769,7 @@ class PollingCollector: # Dict[error_key, dict(category, severity, reason, first_seen, error_key)] self._known_errors: Dict[str, dict] = {} self._first_poll_done = False + self._startup_time = time.time() # Track when service started def start(self): if self._running: @@ -1706,10 +1792,17 @@ class PollingCollector: # ── Main loop ────────────────────────────────────────────── + # Startup grace period: ignore transient errors from certain categories + # during the first N seconds after service start. Remote services like + # PBS storage, VMs with qemu-guest-agent, etc. may take time to connect. + STARTUP_GRACE_PERIOD = 180 # 3 minutes + STARTUP_GRACE_CATEGORIES = {'storage', 'vms', 'network', 'pve_services'} + def _poll_loop(self): """Main polling loop.""" - # Initial delay to let health monitor warm up - for _ in range(15): + # Initial delay to let health monitor and external services warm up. + # PBS storage, NFS mounts, VMs with guest agent all need time after boot. + for _ in range(60): if not self._running: return time.sleep(1) @@ -1750,6 +1843,9 @@ class PollingCollector: return self._check_ai_model_availability() + # Check if startup period ended and we have aggregated VMs to report + self._check_startup_aggregation() + except Exception as e: print(f"[PollingCollector] Error: {e}") @@ -1808,6 +1904,15 @@ class PollingCollector: if error.get('acknowledged') == 1: continue + # Startup grace period: ignore transient errors from categories that + # typically need time to stabilize after boot (storage, VMs, network). + # PBS storage, NFS mounts, VMs with qemu-guest-agent need time to connect. + time_since_startup = now - self._startup_time + if time_since_startup < self.STARTUP_GRACE_PERIOD: + if category in self.STARTUP_GRACE_CATEGORIES: + # Still within grace period for this category - skip notification + continue + # On first poll, seed _last_notified for all existing errors so we # don't re-notify old persistent errors that were already sent before # a service restart. Only genuinely NEW errors (appearing after the @@ -2054,6 +2159,64 @@ class PollingCollector: self._known_errors = current_keys self._first_poll_done = True + def _check_startup_aggregation(self): + """Check if startup period ended and emit aggregated VM/CT start message. + + During the startup grace period, TaskWatcher collects VM/CT starts instead + of emitting individual notifications. Once the period ends, this method + emits a single aggregated "System startup" notification. + """ + # Only check once startup period is over + if _shared_state.is_startup_period(): + return + + # Only emit once + if _shared_state.was_startup_aggregated(): + return + + # Get all collected startup VMs/CTs + startup_items = _shared_state.get_and_clear_startup_vms() + if not startup_items: + return + + # Count VMs and CTs + vms = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'vm'] + cts = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'ct'] + + vm_count = len(vms) + ct_count = len(cts) + total = vm_count + ct_count + + # Build entity list (max 10 items for readability) + entity_names = [] + for vmid, name in (vms + cts)[:10]: + entity_names.append(f'{name} ({vmid})') + if total > 10: + entity_names.append(f'...and {total - 10} more') + + # Build summary text + parts = [] + if vm_count: + parts.append(f'{vm_count} VM{"s" if vm_count != 1 else ""}') + if ct_count: + parts.append(f'{ct_count} CT{"s" if ct_count != 1 else ""}') + summary = ' and '.join(parts) + ' started' + + data = { + 'hostname': self._hostname, + 'summary': summary, + 'vm_count': vm_count, + 'ct_count': ct_count, + 'total_count': total, + 'entity_list': ', '.join(entity_names), + 'reason': f'System startup completed: {summary}', + } + + self._queue.put(NotificationEvent( + 'system_startup', 'INFO', data, source='polling', + entity='node', entity_id='', + )) + # ── Update check (enriched) ──────────────────────────────── # Proxmox-related package prefixes used for categorisation diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index a78f0144..897556f0 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -644,6 +644,13 @@ TEMPLATES = { }, # ── Services events ── + 'system_startup': { + 'title': '{hostname}: System startup — {summary}', + 'body': 'System startup completed.\n{summary}\n\nGuests: {entity_list}', + 'label': 'System startup', + 'group': 'services', + 'default_enabled': True, + }, 'system_shutdown': { 'title': '{hostname}: System shutting down', 'body': 'The node is shutting down.\n{reason}', @@ -1120,6 +1127,7 @@ EVENT_EMOJI = { 'node_disconnect': '\U0001F50C', 'node_reconnect': '\u2705', # Services + 'system_startup': '\U0001F680', # rocket (startup) 'system_shutdown': '\u23FB\uFE0F', # power symbol (Unicode) 'system_reboot': '\U0001F504', 'system_problem': '\u26A0\uFE0F',