From 1242da5ed170d6c76d457e48a9b1877e0891658e Mon Sep 17 00:00:00 2001 From: MacRimi Date: Fri, 27 Feb 2026 18:27:24 +0100 Subject: [PATCH] Update notification service --- AppImage/scripts/flask_server.py | 68 +++++- AppImage/scripts/health_monitor.py | 258 ++++++++++++++++----- AppImage/scripts/health_persistence.py | 61 +++-- AppImage/scripts/notification_events.py | 109 ++++++--- AppImage/scripts/notification_manager.py | 17 ++ AppImage/scripts/notification_templates.py | 15 +- 6 files changed, 403 insertions(+), 125 deletions(-) diff --git a/AppImage/scripts/flask_server.py b/AppImage/scripts/flask_server.py index 9c0088ef..dd38090e 100644 --- a/AppImage/scripts/flask_server.py +++ b/AppImage/scripts/flask_server.py @@ -23,6 +23,7 @@ import time import threading import urllib.parse import hardware_monitor +import health_persistence import xml.etree.ElementTree as ET from datetime import datetime, timedelta from functools import wraps @@ -1159,19 +1160,66 @@ def get_storage_info(): 'ssd_life_left': smart_data.get('ssd_life_left') # Added } - storage_data['disk_count'] += 1 - health = smart_data.get('health', 'unknown').lower() - if health == 'healthy': - storage_data['healthy_disks'] += 1 - elif health == 'warning': - storage_data['warning_disks'] += 1 - elif health in ['critical', 'failed']: - storage_data['critical_disks'] += 1 - except Exception as e: - # print(f"Error getting disk list: {e}") pass + # Enrich physical disks with active I/O errors from health_persistence. + # This is the single source of truth -- health_monitor detects ATA/SCSI/IO + # errors via dmesg, records them in health_persistence, and we read them here. + try: + active_disk_errors = health_persistence.get_active_errors(category='disks') + for err in active_disk_errors: + details = err.get('details', {}) + if isinstance(details, str): + try: + details = json.loads(details) + except (json.JSONDecodeError, TypeError): + details = {} + + err_device = details.get('disk', '') + error_count = details.get('error_count', 0) + sample = details.get('sample', '') + severity = err.get('severity', 'WARNING') + + # Match error to physical disk. + # err_device can be 'sda', 'nvme0n1', or 'ata8' (if resolution failed) + matched_disk = None + if err_device in physical_disks: + matched_disk = err_device + else: + # Try partial match: 'sda' matches disk 'sda' + for dk in physical_disks: + if dk == err_device or err_device.startswith(dk): + matched_disk = dk + break + + if matched_disk: + physical_disks[matched_disk]['io_errors'] = { + 'count': error_count, + 'severity': severity, + 'sample': sample, + 'reason': err.get('reason', ''), + } + # Override health status if I/O errors are more severe + current_health = physical_disks[matched_disk].get('health', 'unknown').lower() + if severity == 'CRITICAL' and current_health != 'critical': + physical_disks[matched_disk]['health'] = 'critical' + elif severity == 'WARNING' and current_health in ('healthy', 'unknown'): + physical_disks[matched_disk]['health'] = 'warning' + except Exception: + pass + + # Count disk health states AFTER I/O error enrichment + for disk_name, disk_info in physical_disks.items(): + storage_data['disk_count'] += 1 + health = disk_info.get('health', 'unknown').lower() + if health == 'healthy': + storage_data['healthy_disks'] += 1 + elif health == 'warning': + storage_data['warning_disks'] += 1 + elif health in ['critical', 'failed']: + storage_data['critical_disks'] += 1 + storage_data['total'] = round(total_disk_size_bytes / (1024**4), 1) # Get disk usage for mounted partitions diff --git a/AppImage/scripts/health_monitor.py b/AppImage/scripts/health_monitor.py index ad2f5fa7..04ace2b1 100644 --- a/AppImage/scripts/health_monitor.py +++ b/AppImage/scripts/health_monitor.py @@ -324,6 +324,12 @@ class HealthMonitor: Returns JSON structure with ALL 10 categories always present. Now includes persistent error tracking. """ + # Run cleanup on every status check to auto-resolve stale errors + try: + health_persistence.cleanup_old_errors() + except Exception: + pass + active_errors = health_persistence.get_active_errors() # No need to create persistent_issues dict here, it's implicitly handled by the checks @@ -1087,16 +1093,67 @@ class HealthMonitor: return storages + def _resolve_ata_to_disk(self, ata_port: str) -> str: + """Resolve an ATA controller name (e.g. 'ata8') to a block device (e.g. 'sda'). + + Uses /sys/class/ata_port/ symlinks and /sys/block/ to find the mapping. + Falls back to parsing dmesg for 'ata8: SATA link up' -> 'sd 7:0:0:0: [sda]'. + """ + if not ata_port or not ata_port.startswith('ata'): + return ata_port + + port_num = ata_port.replace('ata', '') + + # Method 1: Walk /sys/class/ata_port/ -> host -> target -> block + try: + ata_path = f'/sys/class/ata_port/{ata_port}' + if os.path.exists(ata_path): + device_path = os.path.realpath(ata_path) + # Walk up to find the SCSI host, then find block devices + # Path: /sys/devices/.../ataX/hostY/targetY:0:0/Y:0:0:0/block/sdZ + for root, dirs, files in os.walk(os.path.dirname(device_path)): + if 'block' in dirs: + block_path = os.path.join(root, 'block') + devs = os.listdir(block_path) + if devs: + return devs[0] # e.g. 'sda' + except (OSError, IOError): + pass + + # Method 2: Parse dmesg for ATA link messages + try: + result = subprocess.run( + ['dmesg', '--notime'], + capture_output=True, text=True, timeout=2 + ) + if result.returncode == 0: + # Look for "ata8: SATA link up" followed by "sd X:0:0:0: [sda]" + lines = result.stdout.split('\n') + host_num = None + for line in lines: + m = re.search(rf'{ata_port}:\s+SATA link', line) + if m: + # ata port number maps to host(N-1) typically + host_num = int(port_num) - 1 + if host_num is not None: + m2 = re.search(rf'sd\s+{host_num}:\d+:\d+:\d+:\s+\[(\w+)\]', line) + if m2: + return m2.group(1) + except (OSError, subprocess.TimeoutExpired): + pass + + return ata_port # Return original if resolution fails + def _check_disks_optimized(self) -> Dict[str, Any]: """ - Optimized disk check - always returns status. - Checks dmesg for I/O errors and SMART status. - NOTE: This function is now largely covered by _check_storage_optimized, - but kept for potential specific disk-level reporting if needed. - Currently, its primary function is to detect recent I/O errors. + Disk I/O error check -- the SINGLE source of truth for disk errors. + + Reads dmesg for I/O/ATA/SCSI errors, counts per device, records in + health_persistence, and returns status for the health dashboard. + Resolves ATA controller names (ata8) to physical disks (sda). """ current_time = time.time() - disk_issues = {} + disk_results = {} # Single dict for both WARNING and CRITICAL try: # Check dmesg for I/O errors in the last 5 minutes @@ -1107,17 +1164,52 @@ class HealthMonitor: timeout=2 ) + # Collect a sample line per device for richer error messages + disk_samples = {} + if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() - if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error', 'medium error']): - # Try to extract disk name - disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+)', line) - if disk_match: - disk_name = disk_match.group(1) + # Detect various disk error formats + is_disk_error = any(kw in line_lower for kw in [ + 'i/o error', 'scsi error', 'medium error', + 'failed command:', 'exception emask', + ]) + ata_match = re.search(r'(ata\d+)[\.\d]*:.*(?:error|failed|exception)', line_lower) + if ata_match: + is_disk_error = True + + if is_disk_error: + # Extract device from multiple formats + raw_device = None + for dev_re in [ + r'dev\s+(sd[a-z]+)', # dev sdb + r'\[(sd[a-z]+)\]', # [sda] + r'/dev/(sd[a-z]+)', # /dev/sda + r'(nvme\d+n\d+)', # nvme0n1 + r'device\s+(sd[a-z]+\d*)', # device sda1 + r'(ata\d+)', # ata8 (ATA controller) + ]: + dm = re.search(dev_re, line) + if dm: + raw_device = dm.group(1) + break + + if raw_device: + # Resolve ATA port to physical disk name + if raw_device.startswith('ata'): + resolved = self._resolve_ata_to_disk(raw_device) + disk_name = resolved + else: + disk_name = raw_device.rstrip('0123456789') if raw_device.startswith('sd') else raw_device + self.io_error_history[disk_name].append(current_time) + if disk_name not in disk_samples: + # Clean the sample: strip dmesg timestamp prefix + clean = re.sub(r'^\[.*?\]\s*', '', line.strip()) + disk_samples[disk_name] = clean[:200] - # Clean old history (keep errors from the last 5 minutes) + # Clean old history and evaluate per-disk status for disk in list(self.io_error_history.keys()): self.io_error_history[disk] = [ t for t in self.io_error_history[disk] @@ -1125,57 +1217,66 @@ class HealthMonitor: ] error_count = len(self.io_error_history[disk]) + error_key = f'disk_{disk}' + sample = disk_samples.get(disk, '') + display = f'/dev/{disk}' if not disk.startswith('/') else disk - # Report based on recent error count if error_count >= 3: - error_key = f'disk_{disk}' severity = 'CRITICAL' - reason = f'{error_count} I/O errors in 5 minutes' + reason = f'{display}: {error_count} I/O errors in 5 min' + if sample: + reason += f'\n{sample}' health_persistence.record_error( error_key=error_key, category='disks', severity=severity, reason=reason, - details={'disk': disk, 'error_count': error_count, 'dismissable': False} + details={'disk': disk, 'device': display, + 'error_count': error_count, + 'sample': sample, 'dismissable': False} ) - - disk_details[disk] = { + disk_results[display] = { 'status': severity, 'reason': reason, - 'dismissable': False + 'device': disk, + 'error_count': error_count, + 'dismissable': False, } elif error_count >= 1: - error_key = f'disk_{disk}' severity = 'WARNING' - reason = f'{error_count} I/O error(s) in 5 minutes' + reason = f'{display}: {error_count} I/O error(s) in 5 min' + if sample: + reason += f'\n{sample}' health_persistence.record_error( error_key=error_key, category='disks', severity=severity, reason=reason, - details={'disk': disk, 'error_count': error_count, 'dismissable': True} + details={'disk': disk, 'device': display, + 'error_count': error_count, + 'sample': sample, 'dismissable': True} ) - - disk_issues[f'/dev/{disk}'] = { + disk_results[display] = { 'status': severity, 'reason': reason, - 'dismissable': True + 'device': disk, + 'error_count': error_count, + 'dismissable': True, } else: - error_key = f'disk_{disk}' health_persistence.resolve_error(error_key, 'Disk errors cleared') - if not disk_issues: + if not disk_results: return {'status': 'OK'} - has_critical = any(d.get('status') == 'CRITICAL' for d in disk_issues.values()) + has_critical = any(d.get('status') == 'CRITICAL' for d in disk_results.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', - 'reason': f"{len(disk_issues)} disk(s) with recent errors", - 'details': disk_issues + 'reason': f"{len(disk_results)} disk(s) with recent errors", + 'details': disk_results } except Exception as e: @@ -1418,6 +1519,19 @@ class HealthMonitor: Improved detection of container and VM errors from journalctl. """ try: + # First: auto-resolve any persisted VM/CT errors where the guest + # is now running. This clears stale "Failed to start" / QMP + # errors that are no longer relevant. + try: + active_vm_errors = health_persistence.get_active_errors('vms') + for err in active_vm_errors: + details = err.get('details') or {} + vmid = details.get('id', '') + if vmid: + health_persistence.check_vm_running(vmid) + except Exception: + pass + issues = [] vm_details = {} @@ -1470,11 +1584,15 @@ class HealthMonitor: else: reason = 'Container error' - issues.append(f'CT {ctid}: {reason}') + ct_name = self._resolve_vm_name(ctid) + display = f"CT {ctid} ({ct_name})" if ct_name else f"CT {ctid}" + full_reason = f'{display}: {reason}\n{line.strip()[:200]}' + issues.append(f'{display}: {reason}') vm_details[key] = { 'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL', - 'reason': reason, + 'reason': full_reason, 'id': ctid, + 'vmname': ct_name, 'type': 'CT' } continue @@ -1509,11 +1627,15 @@ class HealthMonitor: vmid = id_match.group(1) key = f'vmct_{vmid}' if key not in vm_details: - issues.append(f'VM/CT {vmid}: Failed to start') + vm_name = self._resolve_vm_name(vmid) + display = f"VM/CT {vmid} ({vm_name})" if vm_name else f"VM/CT {vmid}" + full_reason = f'{display}: Failed to start\n{line.strip()[:200]}' + issues.append(f'{display}: Failed to start') vm_details[key] = { 'status': 'CRITICAL', - 'reason': 'Failed to start', + 'reason': full_reason, 'id': vmid, + 'vmname': vm_name, 'type': 'VM/CT' } @@ -1661,20 +1783,25 @@ class HealthMonitor: vm_type = 'VM/CT' if error_key not in vm_details: - reason = 'Failed to start' + vm_name = self._resolve_vm_name(vmid_ctid) + display = f"{vm_type} {vmid_ctid}" + if vm_name: + display = f"{vm_type} {vmid_ctid} ({vm_name})" + reason = f'{display}: Failed to start\n{line.strip()[:200]}' # Record persistent error health_persistence.record_error( error_key=error_key, category='vms', severity='CRITICAL', reason=reason, - details={'id': vmid_ctid, 'type': vm_type} + details={'id': vmid_ctid, 'vmname': vm_name, 'type': vm_type} ) - issues.append(f'{vm_type} {vmid_ctid}: {reason}') + issues.append(f'{display}: Failed to start') vm_details[error_key] = { 'status': 'CRITICAL', 'reason': reason, 'id': vmid_ctid, + 'vmname': vm_name, 'type': vm_type } @@ -1979,6 +2106,18 @@ class HealthMonitor: if recent_count >= 5 and recent_count >= prev_count * 4: spike_errors[pattern] = recent_count + # Helper: get human-readable samples from normalized patterns + def _get_samples(error_dict, max_items=3): + """Return list of readable sample lines for error patterns.""" + samples = [] + for pattern in list(error_dict.keys())[:max_items]: + pdata = self.persistent_log_patterns.get(pattern, {}) + sample = pdata.get('sample', pattern) + # Trim timestamp prefix if present (e.g. "Feb 27 16:03:35 host ") + clean = re.sub(r'^[A-Z][a-z]{2}\s+\d+\s+[\d:]+\s+\S+\s+', '', sample) + samples.append(clean[:120]) + return samples + persistent_errors = {} for pattern, data in self.persistent_log_patterns.items(): time_span = current_time - data['first_seen'] @@ -2018,31 +2157,38 @@ class HealthMonitor: # Get a representative critical error reason representative_error = next(iter(critical_errors_found.values())) reason = f'Critical error detected: {representative_error[:100]}' - elif cascade_count > 0: - status = 'WARNING' - reason = f'Error cascade detected: {cascade_count} pattern(s) repeating ≥15 times in 3min' - elif spike_count > 0: - status = 'WARNING' - reason = f'Error spike detected: {spike_count} pattern(s) increased 4x' - elif persistent_count > 0: - status = 'WARNING' - reason = f'Persistent errors: {persistent_count} pattern(s) recurring over 15+ minutes' + elif cascade_count > 0: + status = 'WARNING' + samples = _get_samples(cascading_errors, 3) + reason = f'Error cascade ({cascade_count} patterns repeating):\n' + '\n'.join(f' - {s}' for s in samples) + elif spike_count > 0: + status = 'WARNING' + samples = _get_samples(spike_errors, 3) + reason = f'Error spike ({spike_count} patterns with 4x increase):\n' + '\n'.join(f' - {s}' for s in samples) + elif persistent_count > 0: + status = 'WARNING' + samples = _get_samples(persistent_errors, 3) + reason = f'Persistent errors ({persistent_count} patterns over 15+ min):\n' + '\n'.join(f' - {s}' for s in samples) else: # No significant issues found status = 'OK' reason = None # Record/clear persistent errors for each log sub-check so Dismiss works - log_sub_checks = { - 'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING', - 'reason': f'{cascade_count} pattern(s) repeating >=15 times'}, - 'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING', - 'reason': f'{spike_count} pattern(s) with 4x increase'}, - 'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING', - 'reason': f'{persistent_count} recurring pattern(s) over 15+ min'}, - 'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL', - 'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False}, - } + cascade_samples = _get_samples(cascading_errors, 2) if cascade_count else [] + spike_samples = _get_samples(spike_errors, 2) if spike_count else [] + persist_samples = _get_samples(persistent_errors, 2) if persistent_count else [] + + log_sub_checks = { + 'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING', + 'reason': f'{cascade_count} pattern(s) repeating >=15 times:\n' + '\n'.join(f' - {s}' for s in cascade_samples) if cascade_count else ''}, + 'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING', + 'reason': f'{spike_count} pattern(s) with 4x increase:\n' + '\n'.join(f' - {s}' for s in spike_samples) if spike_count else ''}, + 'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING', + 'reason': f'{persistent_count} recurring pattern(s) over 15+ min:\n' + '\n'.join(f' - {s}' for s in persist_samples) if persistent_count else ''}, + 'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL', + 'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False}, + } # Track which sub-checks were dismissed dismissed_keys = set() diff --git a/AppImage/scripts/health_persistence.py b/AppImage/scripts/health_persistence.py index d91b98c9..53ebe257 100644 --- a/AppImage/scripts/health_persistence.py +++ b/AppImage/scripts/health_persistence.py @@ -25,12 +25,8 @@ from pathlib import Path class HealthPersistence: """Manages persistent health error tracking""" - # Error retention periods (seconds) - VM_ERROR_RETENTION = 48 * 3600 # 48 hours - LOG_ERROR_RETENTION = 24 * 3600 # 24 hours - DISK_ERROR_RETENTION = 48 * 3600 # 48 hours - - # Default suppression: 24 hours (user can change per-category in settings) + # Default suppression duration when no user setting exists for a category. + # Users can override per-category via the Suppression Duration settings. DEFAULT_SUPPRESSION_HOURS = 24 # Mapping from error categories to settings keys @@ -496,32 +492,51 @@ class HealthPersistence: cursor = conn.cursor() now = datetime.now() + now_iso = now.isoformat() # Delete resolved errors older than 7 days cutoff_resolved = (now - timedelta(days=7)).isoformat() cursor.execute('DELETE FROM errors WHERE resolved_at < ?', (cutoff_resolved,)) - # Auto-resolve VM/CT errors older than 48h - cutoff_vm = (now - timedelta(seconds=self.VM_ERROR_RETENTION)).isoformat() - cursor.execute(''' - UPDATE errors - SET resolved_at = ? - WHERE category = 'vms' - AND resolved_at IS NULL - AND first_seen < ? - AND acknowledged = 0 - ''', (now.isoformat(), cutoff_vm)) + # ── Auto-resolve stale errors using user-configured Suppression Duration ── + # Read the per-category suppression hours from user_settings. + # If the user hasn't configured a category, fall back to DEFAULT_SUPPRESSION_HOURS. + # This is the SINGLE source of truth for auto-resolution timing. + user_settings = {} + try: + cursor.execute('SELECT setting_key, setting_value FROM user_settings WHERE setting_key LIKE ?', ('suppress_%',)) + for row in cursor.fetchall(): + user_settings[row[0]] = row[1] + except Exception: + pass - # Auto-resolve log errors older than 24h - cutoff_logs = (now - timedelta(seconds=self.LOG_ERROR_RETENTION)).isoformat() + for category, setting_key in self.CATEGORY_SETTING_MAP.items(): + stored = user_settings.get(setting_key) + try: + hours = int(stored) if stored else self.DEFAULT_SUPPRESSION_HOURS + except (ValueError, TypeError): + hours = self.DEFAULT_SUPPRESSION_HOURS + + cutoff = (now - timedelta(hours=hours)).isoformat() + cursor.execute(''' + UPDATE errors + SET resolved_at = ? + WHERE category = ? + AND resolved_at IS NULL + AND last_seen < ? + AND acknowledged = 0 + ''', (now_iso, category, cutoff)) + + # Catch-all: auto-resolve ANY error from an unmapped category + # whose last_seen exceeds DEFAULT_SUPPRESSION_HOURS. + fallback_cutoff = (now - timedelta(hours=self.DEFAULT_SUPPRESSION_HOURS)).isoformat() cursor.execute(''' - UPDATE errors + UPDATE errors SET resolved_at = ? - WHERE category = 'logs' - AND resolved_at IS NULL - AND first_seen < ? + WHERE resolved_at IS NULL AND acknowledged = 0 - ''', (now.isoformat(), cutoff_logs)) + AND last_seen < ? + ''', (now_iso, fallback_cutoff)) # Delete old events (>30 days) cutoff_events = (now - timedelta(days=30)).isoformat() diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 4adb63a2..8a47d428 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -266,17 +266,24 @@ class JournalWatcher: if re.search(noise, msg, re.IGNORECASE): return + # NOTE: Disk I/O errors (ATA, SCSI, blk_update_request) are NOT handled + # here. They are detected exclusively by HealthMonitor._check_disks_optimized + # which records to health_persistence -> PollingCollector -> notification. + # This avoids duplicate notifications and ensures the health dashboard + # stays in sync with notifications. + # Filesystem errors (EXT4/BTRFS/XFS/ZFS) ARE handled here because they + # indicate corruption, not just hardware I/O problems. + critical_patterns = { r'kernel panic': ('system_problem', 'CRITICAL', 'Kernel panic'), r'Out of memory': ('system_problem', 'CRITICAL', 'Out of memory killer activated'), r'segfault': ('system_problem', 'WARNING', 'Segmentation fault detected'), r'BUG:': ('system_problem', 'CRITICAL', 'Kernel BUG detected'), r'Call Trace:': ('system_problem', 'WARNING', 'Kernel call trace'), - r'I/O error.*dev\s+(\S+)': ('disk_io_error', 'CRITICAL', 'Disk I/O error'), - r'EXT4-fs error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), - r'BTRFS error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), - r'XFS.*error': ('disk_io_error', 'CRITICAL', 'Filesystem error'), - r'ZFS.*error': ('disk_io_error', 'CRITICAL', 'ZFS pool error'), + r'EXT4-fs error': ('system_problem', 'CRITICAL', 'Filesystem error'), + r'BTRFS error': ('system_problem', 'CRITICAL', 'Filesystem error'), + r'XFS.*error': ('system_problem', 'CRITICAL', 'Filesystem error'), + r'ZFS.*error': ('system_problem', 'CRITICAL', 'ZFS pool error'), r'mce:.*Hardware Error': ('system_problem', 'CRITICAL', 'Hardware error (MCE)'), } @@ -286,11 +293,9 @@ class JournalWatcher: entity_id = '' # Build a context-rich reason from the journal message. - # The raw msg contains process name, PID, addresses, library, etc. enriched = reason if 'segfault' in pattern: - # Kernel segfault: "process[PID]: segfault at ADDR ... in lib.so" m = re.search(r'(\S+)\[(\d+)\].*segfault', msg) proc_name = m.group(1) if m else '' proc_pid = m.group(2) if m else '' @@ -305,30 +310,17 @@ class JournalWatcher: enriched = '\n'.join(parts) elif 'Out of memory' in pattern: - # OOM: "Out of memory: Killed process PID (name)" m = re.search(r'Killed process\s+(\d+)\s+\(([^)]+)\)', msg) if m: enriched = f"{reason}\nKilled: {m.group(2)} (PID {m.group(1)})" else: enriched = f"{reason}\n{msg[:300]}" - elif event_type == 'disk_io_error': - # Include device and raw message for disk/fs errors - dev_match = re.search(r'dev\s+(\S+)', msg) - if dev_match: - entity = 'disk' - entity_id = dev_match.group(1) - enriched = f"{reason}\nDevice: {dev_match.group(1)}" - else: - enriched = f"{reason}\n{msg[:300]}" - else: # Generic: include the raw journal message for context enriched = f"{reason}\n{msg[:300]}" data = {'reason': enriched, 'hostname': self._hostname} - if entity == 'disk': - data['device'] = entity_id self._emit(event_type, severity, data, entity=entity, entity_id=entity_id) return @@ -466,19 +458,65 @@ class JournalWatcher: }, entity='cluster', entity_id=node_name) def _check_system_shutdown(self, msg: str, syslog_id: str): - """Detect system shutdown/reboot.""" - if 'systemd-journald' in syslog_id or 'systemd' in syslog_id: - if 'Journal stopped' in msg or 'Stopping Journal Service' in msg: - self._emit('system_shutdown', 'WARNING', { - 'reason': 'System journal stopped', - 'hostname': self._hostname, - }, entity='node', entity_id='') - elif 'Shutting down' in msg or 'System is rebooting' in msg: - event = 'system_reboot' if 'reboot' in msg.lower() else 'system_shutdown' - self._emit(event, 'WARNING', { - 'reason': msg[:200], - 'hostname': self._hostname, - }, entity='node', entity_id='') + """Detect system shutdown/reboot. + + Matches multiple systemd signals that indicate the node is going down: + - "Shutting down." (systemd PID 1) + - "System is powering off." / "System is rebooting." + - "Reached target Shutdown." / "Reached target Reboot." + - "Journal stopped" (very late in shutdown) + - "The system will reboot now!" / "The system will power off now!" + """ + msg_lower = msg.lower() + + # Only process systemd / logind messages + if not any(s in syslog_id for s in ('systemd', 'logind', '')): + if 'systemd' not in msg_lower: + return + + is_reboot = False + is_shutdown = False + + # Detect reboot signals + reboot_signals = [ + 'system is rebooting', + 'reached target reboot', + 'the system will reboot now', + 'starting reboot', + ] + for sig in reboot_signals: + if sig in msg_lower: + is_reboot = True + break + + # Detect shutdown/poweroff signals + if not is_reboot: + shutdown_signals = [ + 'system is powering off', + 'system is halting', + 'shutting down', + 'reached target shutdown', + 'reached target halt', + 'the system will power off now', + 'starting power-off', + 'journal stopped', + 'stopping journal service', + ] + for sig in shutdown_signals: + if sig in msg_lower: + is_shutdown = True + break + + if is_reboot: + self._emit('system_reboot', 'CRITICAL', { + 'reason': msg[:200], + 'hostname': self._hostname, + }, entity='node', entity_id='') + elif is_shutdown: + self._emit('system_shutdown', 'CRITICAL', { + 'reason': msg[:200], + 'hostname': self._hostname, + }, entity='node', entity_id='') def _check_permission_change(self, msg: str, syslog_id: str): """Detect user permission changes in PVE.""" @@ -557,7 +595,8 @@ class TaskWatcher: 'qmreset': ('vm_restart', 'INFO'), 'vzstart': ('ct_start', 'INFO'), 'vzstop': ('ct_stop', 'INFO'), - 'vzshutdown': ('ct_stop', 'INFO'), + 'vzshutdown': ('ct_shutdown', 'INFO'), + 'vzreboot': ('ct_restart', 'INFO'), 'vzdump': ('backup_start', 'INFO'), 'qmsnapshot': ('snapshot_complete', 'INFO'), 'vzsnapshot': ('snapshot_complete', 'INFO'), @@ -741,7 +780,7 @@ class TaskWatcher: # These are backup-induced operations (mode=stop), not user actions. # Exception: if a VM/CT FAILS to start after backup, that IS important. _BACKUP_NOISE = {'vm_start', 'vm_stop', 'vm_shutdown', 'vm_restart', - 'ct_start', 'ct_stop'} + 'ct_start', 'ct_stop', 'ct_shutdown', 'ct_restart'} if event_type in _BACKUP_NOISE and not is_error: if self._is_vzdump_active(): return diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index 68c10965..3b2bed92 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -661,6 +661,23 @@ class NotificationManager: if event.event_type in _ALWAYS_DELIVER and cooldown_str is None: cooldown = 10 + # VM/CT state changes are real user actions that should always be + # delivered. Each start/stop/shutdown is a distinct event. A 5s + # cooldown prevents exact duplicates from concurrent watchers. + _STATE_EVENTS = { + 'vm_start', 'vm_stop', 'vm_shutdown', 'vm_restart', + 'ct_start', 'ct_stop', 'ct_shutdown', 'ct_restart', + 'vm_fail', 'ct_fail', + } + if event.event_type in _STATE_EVENTS and cooldown_str is None: + cooldown = 5 + + # System shutdown/reboot must be delivered immediately -- the node + # is going down and there may be only seconds to send the message. + _URGENT_EVENTS = {'system_shutdown', 'system_reboot'} + if event.event_type in _URGENT_EVENTS and cooldown_str is None: + cooldown = 5 + # Check against last sent time using stable fingerprint last_sent = self._cooldowns.get(event.fingerprint, 0) diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index 7585a758..55371f45 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -373,6 +373,18 @@ TEMPLATES = { 'group': 'vm_ct', 'default_enabled': False, }, + 'ct_shutdown': { + 'title': '{hostname}: CT {vmid} shutdown', + 'body': '{vmname} ({vmid}) has been shut down.', + 'group': 'vm_ct', + 'default_enabled': False, + }, + 'ct_restart': { + 'title': '{hostname}: CT {vmid} restarted', + 'body': '{vmname} ({vmid}) has been restarted.', + 'group': 'vm_ct', + 'default_enabled': False, + }, 'ct_fail': { 'title': '{hostname}: CT {vmid} FAILED', 'body': '{vmname} ({vmid}) has failed.\n{reason}', @@ -469,7 +481,7 @@ TEMPLATES = { }, 'disk_io_error': { 'title': '{hostname}: Disk I/O error', - 'body': 'I/O error detected on {device}.\n{reason}', + 'body': '{reason}', 'group': 'storage', 'default_enabled': True, }, @@ -730,6 +742,7 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]: 'security_count': '0', 'total_count': '0', 'package_list': '', 'packages': '', 'pve_packages': '', 'version': '', 'issue_list': '', 'error_key': '', + 'storage_name': '', 'storage_type': '', } variables.update(data)