From c0a882251d36b1f670242559b22361fc756b715a Mon Sep 17 00:00:00 2001 From: MacRimi Date: Sat, 28 Feb 2026 20:22:24 +0100 Subject: [PATCH] Update notification service --- AppImage/scripts/notification_events.py | 99 +++++++++++++++++++++- AppImage/scripts/notification_templates.py | 2 +- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 09dfa167..609c501b 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -598,8 +598,9 @@ class JournalWatcher: enriched = '\n'.join(parts) + dev_display = resolved if resolved.startswith('/dev/') else f'/dev/{resolved}' self._emit('disk_io_error', 'CRITICAL', { - 'device': resolved, + 'device': dev_display, 'reason': enriched, 'hostname': self._hostname, }, entity='disk', entity_id=resolved) @@ -881,6 +882,8 @@ class TaskWatcher: # to avoid timing gaps. self._vzdump_running_since: float = 0 # 0 = no vzdump tracked self._vzdump_grace_period = 120 # seconds after vzdump ends to still suppress + # Track active-file UPIDs we've already seen, to avoid duplicate backup_start + self._seen_active_upids: set = set() def start(self): if self._running: @@ -894,6 +897,17 @@ class TaskWatcher: except OSError: self._last_position = 0 + # Pre-load active UPIDs so we don't fire backup_start for already-running jobs + if os.path.exists(self.TASK_ACTIVE): + try: + with open(self.TASK_ACTIVE, 'r') as f: + for line in f: + upid = line.strip().split()[0] if line.strip() else '' + if upid: + self._seen_active_upids.add(upid) + except Exception: + pass + self._thread = threading.Thread(target=self._watch_loop, daemon=True, name='task-watcher') self._thread.start() @@ -953,10 +967,13 @@ class TaskWatcher: self._vzdump_negative_cache = now return False + TASK_ACTIVE = '/var/log/pve/tasks/active' + def _watch_loop(self): - """Poll the task index file for new entries.""" + """Poll task index for completions AND active file for new starts.""" while self._running: try: + # 1. Check index for completed tasks if os.path.exists(self.TASK_LOG): current_size = os.path.getsize(self.TASK_LOG) @@ -972,11 +989,89 @@ class TaskWatcher: for line in new_lines: self._process_task_line(line.strip()) + + # 2. Check active file for newly started tasks (backup start) + self._check_active_tasks() + except Exception as e: print(f"[TaskWatcher] Error reading task log: {e}") time.sleep(2) # Check every 2 seconds + def _check_active_tasks(self): + """Scan /var/log/pve/tasks/active for newly started vzdump tasks. + + The 'active' file lists UPIDs of currently running PVE tasks. + Format: UPID:node:pid:pstart:starttime:type:id:user: + Example: UPID:amd:0018088D:020C7A6E:69A33A76:vzdump:101:root@pam: + + We track seen UPIDs to emit backup_start only once per task, + and clean up stale entries when they disappear from the file. + """ + if not os.path.exists(self.TASK_ACTIVE): + return + + try: + current_upids = set() + with open(self.TASK_ACTIVE, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + # Active file format: just the UPID per line + upid = line.split()[0] if line.split() else line + current_upids.add(upid) + + # Only care about vzdump (backup) tasks + if ':vzdump:' not in upid: + continue + + # Already seen this task? + if upid in self._seen_active_upids: + continue + + self._seen_active_upids.add(upid) + + # Parse UPID: UPID:node:pid:pstart:starttime:type:id:user: + upid_parts = upid.split(':') + # Index: 0 1 2 3 4 5 6 7 + if len(upid_parts) < 8: + continue + + vmid = upid_parts[6] # The guest ID being backed up + user = upid_parts[7] + vmname = self._get_vm_name(vmid) if vmid else '' + + # Track vzdump internally for VM suppression + self._vzdump_running_since = time.time() + + # Emit backup_start notification + guest_label = vmname if vmname else f'ID {vmid}' + data = { + 'vmid': vmid, + 'vmname': guest_label, + 'hostname': self._hostname, + 'user': user, + 'reason': f'Backup started for {guest_label} ({vmid})', + 'target_node': '', + 'size': '', + 'snapshot_name': '', + } + + self._queue.put(NotificationEvent( + 'backup_start', 'INFO', data, + source='tasks', + entity='vm' if vmid.isdigit() and int(vmid) >= 100 else 'ct', + entity_id=vmid, + )) + + # Cleanup: remove UPIDs that are no longer in the active file + stale = self._seen_active_upids - current_upids + self._seen_active_upids -= stale + + except Exception as e: + print(f"[TaskWatcher] Error reading active tasks: {e}") + def _process_task_line(self, line: str): """Process a single task index line. diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index 153e78f0..52110597 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -480,7 +480,7 @@ TEMPLATES = { 'default_enabled': True, }, 'disk_io_error': { - 'title': '{hostname}: Disk I/O error on /dev/{device}', + 'title': '{hostname}: Disk I/O error on {device}', 'body': '{reason}', 'group': 'storage', 'default_enabled': True,