update notification service

This commit is contained in:
MacRimi
2026-03-01 17:24:13 +01:00
parent 3e96a89adf
commit bcf5395868
5 changed files with 302 additions and 159 deletions

View File

@@ -69,7 +69,7 @@ def acknowledge_error():
# Use the error's category to clear the correct cache
category = result.get('category', '')
cache_key_map = {
'logs': 'system_logs',
'logs': 'logs_analysis',
'pve_services': 'pve_services',
'updates': 'updates_check',
'security': 'security_check',

View File

@@ -1225,6 +1225,54 @@ class HealthMonitor:
except Exception:
return 'UNKNOWN'
def _check_all_disks_smart(self, fallback: str = 'UNKNOWN') -> str:
"""Check SMART health of ALL physical disks.
Used when an ATA port can't be resolved to a specific /dev/sdX.
If ALL disks report PASSED, returns 'PASSED' (errors are transient).
If ANY disk reports FAILED, returns 'FAILED'.
Otherwise returns the fallback value.
"""
try:
# List all block devices (exclude partitions, loop, zram, dm)
result = subprocess.run(
['lsblk', '-dnpo', 'NAME,TYPE'],
capture_output=True, text=True, timeout=3
)
if result.returncode != 0:
return fallback
disks = []
for line in result.stdout.strip().split('\n'):
parts = line.split()
if len(parts) >= 2 and parts[1] == 'disk':
disks.append(parts[0]) # e.g. /dev/sda
if not disks:
return fallback
all_passed = True
any_failed = False
checked = 0
for dev in disks:
health = self._quick_smart_health(dev)
if health == 'PASSED':
checked += 1
elif health == 'FAILED':
any_failed = True
break
else:
all_passed = False # Can't confirm this disk
if any_failed:
return 'FAILED'
if all_passed and checked > 0:
return 'PASSED'
return fallback
except Exception:
return fallback
def _check_disks_optimized(self) -> Dict[str, Any]:
"""
Disk I/O error check -- the SINGLE source of truth for disk errors.
@@ -1240,12 +1288,18 @@ class HealthMonitor:
current_time = time.time()
disk_results = {} # Single dict for both WARNING and CRITICAL
# Common transient ATA patterns that auto-recover and are not real disk failures
# 'action 0x0' means the kernel recovered automatically with no action needed
# Common transient ATA patterns that auto-recover and are not real disk failures.
# These are bus/controller level events, NOT media errors:
# action 0x0 = no action needed (fully recovered)
# action 0x6 = hard reset + port reinit (common cable/connector recovery)
# SError with BadCRC/Dispar = signal integrity issue (cable, not disk)
# Emask 0x10 = ATA bus error (controller/interconnect, not media)
TRANSIENT_PATTERNS = [
re.compile(r'exception\s+emask.*action\s+0x0', re.IGNORECASE),
re.compile(r'exception\s+emask.*action\s+0x[06]', re.IGNORECASE),
re.compile(r'serror.*=.*0x[0-9a-f]+\s*\(', re.IGNORECASE),
re.compile(r'SError:.*\{.*\}', re.IGNORECASE),
re.compile(r'SError:.*\{.*(?:BadCRC|Dispar|CommWake).*\}', re.IGNORECASE),
re.compile(r'emask\s+0x10\s+\(ATA bus error\)', re.IGNORECASE),
re.compile(r'failed command:\s*READ FPDMA QUEUED', re.IGNORECASE),
]
try:
@@ -1328,9 +1382,32 @@ class HealthMonitor:
if error_count >= 1:
# Cross-reference with SMART to determine real severity
smart_health = self._quick_smart_health(disk)
# If SMART is UNKNOWN (unresolved ATA port), check ALL
# physical disks. If every disk passes SMART, the ATA
# errors are transient bus/controller noise.
if smart_health == 'UNKNOWN':
smart_health = self._check_all_disks_smart(smart_health)
smart_ok = smart_health == 'PASSED'
if smart_ok:
# Transient-only errors (e.g. SError with auto-recovery)
# are always INFO regardless of SMART
if all_transient:
reason = f'{display}: {error_count} transient ATA event(s) in 5 min (auto-recovered)'
if sample:
reason += f'\n{sample}'
health_persistence.resolve_error(error_key, 'Transient ATA events, auto-recovered')
disk_results[display] = {
'status': 'INFO',
'reason': reason,
'device': disk,
'error_count': error_count,
'smart_status': smart_health,
'dismissable': False,
'error_key': error_key,
}
elif smart_ok:
# SMART is healthy -> dmesg errors are informational only
# The disk is fine; these are transient controller/bus events
reason = f'{display}: {error_count} I/O event(s) in 5 min (SMART: OK)'
@@ -1349,11 +1426,10 @@ class HealthMonitor:
'dismissable': False,
'error_key': error_key,
}
else:
# SMART is FAILED or UNKNOWN with I/O errors -> real problem
severity = 'CRITICAL' if error_count >= 3 else 'WARNING'
smart_note = f'SMART: {smart_health}' if smart_health != 'UNKNOWN' else 'SMART: check manually'
reason = f'{display}: {error_count} I/O error(s) in 5 min ({smart_note})'
elif smart_health == 'FAILED':
# SMART confirms a real disk failure
severity = 'CRITICAL'
reason = f'{display}: {error_count} I/O error(s) in 5 min (SMART: FAILED)'
if sample:
reason += f'\n{sample}'
@@ -1365,7 +1441,7 @@ class HealthMonitor:
details={'disk': disk, 'device': display,
'error_count': error_count,
'smart_status': smart_health,
'sample': sample, 'dismissable': severity != 'CRITICAL'}
'sample': sample, 'dismissable': False}
)
disk_results[display] = {
'status': severity,
@@ -1373,7 +1449,35 @@ class HealthMonitor:
'device': disk,
'error_count': error_count,
'smart_status': smart_health,
'dismissable': severity != 'CRITICAL',
'dismissable': False,
'error_key': error_key,
}
else:
# SMART is genuinely UNKNOWN (no disk resolved, no
# smartctl at all) -- treat as WARNING, not CRITICAL.
# These are likely transient and will auto-resolve.
severity = 'WARNING'
reason = f'{display}: {error_count} I/O event(s) in 5 min (SMART: unavailable)'
if sample:
reason += f'\n{sample}'
health_persistence.record_error(
error_key=error_key,
category='disks',
severity=severity,
reason=reason,
details={'disk': disk, 'device': display,
'error_count': error_count,
'smart_status': smart_health,
'sample': sample, 'dismissable': True}
)
disk_results[display] = {
'status': severity,
'reason': reason,
'device': disk,
'error_count': error_count,
'smart_status': smart_health,
'dismissable': True,
'error_key': error_key,
}
else:
@@ -1816,11 +1920,13 @@ class HealthMonitor:
# Get persistent errors first
persistent_errors = health_persistence.get_active_errors('vms')
# Check if any persistent VMs/CTs have started
# Check if any persistent VMs/CTs have started or were dismissed
for error in persistent_errors:
error_key = error['error_key']
if error_key.startswith('vm_') or error_key.startswith('ct_'):
vm_id = error_key.split('_')[1]
is_acknowledged = error.get('acknowledged') == 1
if error_key.startswith(('vm_', 'ct_', 'vmct_')):
vm_id = error_key.split('_', 1)[1]
# Check if VM is running using persistence helper
if health_persistence.check_vm_running(vm_id):
continue # Error auto-resolved if VM is now running
@@ -1831,9 +1937,12 @@ class HealthMonitor:
'reason': error['reason'],
'id': error.get('details', {}).get('id', 'unknown'),
'type': error.get('details', {}).get('type', 'VM/CT'),
'first_seen': error['first_seen']
'first_seen': error['first_seen'],
'dismissed': is_acknowledged,
}
issues.append(f"{error.get('details', {}).get('type', 'VM')} {error.get('details', {}).get('id', '')}: {error['reason']}")
# Only add to issues if not dismissed
if not is_acknowledged:
issues.append(f"{error.get('details', {}).get('type', 'VM')} {error.get('details', {}).get('id', '')}: {error['reason']}")
# Check for new errors in logs
# Using 'warning' priority to catch potential startup issues
@@ -1958,24 +2067,31 @@ class HealthMonitor:
}
# Build checks dict from vm_details
# 'key' is the persistence error_key (e.g. 'qmp_110', 'ct_101', 'vm_110')
checks = {}
for key, val in vm_details.items():
vm_label = f"{val.get('type', 'VM')} {val.get('id', key)}"
is_dismissed = val.get('dismissed', False)
checks[vm_label] = {
'status': val.get('status', 'WARNING'),
'status': 'INFO' if is_dismissed else val.get('status', 'WARNING'),
'detail': val.get('reason', 'Error'),
'dismissable': True,
'error_key': vm_label
'dismissed': is_dismissed,
'error_key': key # Must match the persistence DB key
}
if not issues:
checks['qmp_communication'] = {'status': 'OK', 'detail': 'No QMP timeouts detected'}
checks['container_startup'] = {'status': 'OK', 'detail': 'No container startup errors'}
checks['vm_startup'] = {'status': 'OK', 'detail': 'No VM startup failures'}
checks['oom_killer'] = {'status': 'OK', 'detail': 'No OOM events detected'}
# No active (non-dismissed) issues
if not checks:
checks['qmp_communication'] = {'status': 'OK', 'detail': 'No QMP timeouts detected'}
checks['container_startup'] = {'status': 'OK', 'detail': 'No container startup errors'}
checks['vm_startup'] = {'status': 'OK', 'detail': 'No VM startup failures'}
checks['oom_killer'] = {'status': 'OK', 'detail': 'No OOM events detected'}
return {'status': 'OK', 'checks': checks}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
# Only consider non-dismissed items for overall severity
active_details = {k: v for k, v in vm_details.items() if not v.get('dismissed')}
has_critical = any(d.get('status') == 'CRITICAL' for d in active_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
@@ -2532,10 +2648,21 @@ class HealthMonitor:
}
# Recalculate overall status considering dismissed items
active_issues = [k for k, v in log_checks.items() if v['status'] in ('WARNING', 'CRITICAL')]
active_issues = {k: v for k, v in log_checks.items() if v['status'] in ('WARNING', 'CRITICAL')}
if not active_issues:
status = 'OK'
reason = None
else:
# Recalculate status and reason from only non-dismissed sub-checks
has_critical = any(v['status'] == 'CRITICAL' for v in active_issues.values())
status = 'CRITICAL' if has_critical else 'WARNING'
# Rebuild reason from active (non-dismissed) checks only
active_reasons = []
for k, v in active_issues.items():
detail = v.get('detail', '')
if detail:
active_reasons.append(detail)
reason = '; '.join(active_reasons[:3]) if active_reasons else None
log_result = {'status': status, 'checks': log_checks}
if reason:

View File

@@ -419,7 +419,7 @@ class HealthPersistence:
for cat, prefix in [('updates', 'security_updates'), ('updates', 'system_age'),
('updates', 'pending_updates'), ('updates', 'kernel_pve'),
('security', 'security_'),
('pve_services', 'pve_service_'), ('vms', 'vm_'), ('vms', 'ct_'),
('pve_services', 'pve_service_'), ('vms', 'vmct_'), ('vms', 'vm_'), ('vms', 'ct_'),
('disks', 'disk_'), ('logs', 'log_'), ('network', 'net_'),
('temperature', 'temp_')]:
if error_key == prefix or error_key.startswith(prefix):
@@ -488,6 +488,22 @@ class HealthPersistence:
'suppression_hours': sup_hours
})
# Cascade acknowledge: when dismissing a group check
# (e.g. log_persistent_errors), also dismiss all individual
# sub-errors that share the same prefix in the DB.
# Currently only persistent errors have per-pattern sub-records
# (e.g. log_persistent_a1b2c3d4).
CASCADE_PREFIXES = {
'log_persistent_errors': 'log_persistent_',
}
child_prefix = CASCADE_PREFIXES.get(error_key)
if child_prefix:
cursor.execute('''
UPDATE errors
SET acknowledged = 1, resolved_at = ?, suppression_hours = ?
WHERE error_key LIKE ? AND acknowledged = 0
''', (now, sup_hours, child_prefix + '%'))
result = {
'success': True,
'error_key': error_key,

View File

@@ -189,6 +189,7 @@ class JournalWatcher:
self._check_system_shutdown(msg, syslog_id)
self._check_permission_change(msg, syslog_id)
self._check_firewall(msg, syslog_id)
self._check_backup_start(msg, syslog_id)
def _process_plain(self, line: str):
"""Fallback: process a plain text log line."""
@@ -690,6 +691,103 @@ class JournalWatcher:
return ''
def _check_backup_start(self, msg: str, syslog_id: str):
"""Detect backup job start from pvedaemon journal messages.
Matches: INFO: starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ...
PVE always emits this message from pvedaemon for BOTH scheduled and
manual backups. It contains the full guest list and all parameters.
The UPID "starting task" message is ignored because it arrives first
but lacks storage/mode/compression details.
"""
if syslog_id != 'pvedaemon':
return
match = re.match(r'INFO: starting new backup job: vzdump\s+(.*)', msg)
if not match:
return
raw_args = match.group(1)
# Parse the vzdump arguments from the log message
guests = []
storage = ''
mode = ''
compress = ''
args = raw_args.split()
i = 0
while i < len(args):
arg = args[i]
if arg.isdigit():
guests.append(arg)
elif arg == '--storage' and i + 1 < len(args):
storage = args[i + 1]
i += 1
elif arg == '--mode' and i + 1 < len(args):
mode = args[i + 1]
i += 1
elif arg == '--compress' and i + 1 < len(args):
compress = args[i + 1]
i += 1
elif arg == '--all' and i + 1 < len(args):
if args[i + 1] == '1':
guests = ['all']
i += 1
i += 1
# Build the notification body
reason_parts = []
if guests:
if guests == ['all']:
reason_parts.append('Guests: All VMs/CTs')
else:
guest_lines = []
for gid in guests:
gname = self._resolve_vm_name(gid)
if gname:
guest_lines.append(f' {gname} ({gid})')
else:
guest_lines.append(f' ID {gid}')
reason_parts.append('Guests:\n' + '\n'.join(guest_lines))
details = []
if storage:
details.append(f'Storage: {storage}')
if mode:
details.append(f'Mode: {mode}')
if compress:
details.append(f'Compression: {compress}')
if details:
reason_parts.append(' | '.join(details))
reason = '\n'.join(reason_parts) if reason_parts else 'Backup job started'
self._emit('backup_start', 'INFO', {
'vmid': ', '.join(guests),
'vmname': '',
'hostname': self._hostname,
'user': '',
'reason': reason,
}, entity='backup', entity_id='vzdump')
def _resolve_vm_name(self, vmid: str) -> str:
"""Try to resolve a VMID to its name from PVE config files."""
if not vmid or not vmid.isdigit():
return ''
for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']:
conf = f'{base}/{vmid}.conf'
try:
with open(conf, 'r') as f:
for line in f:
if line.startswith('name:') or line.startswith('hostname:'):
return line.split(':', 1)[1].strip()
except (OSError, IOError):
pass
return ''
def _check_cluster_events(self, msg: str, syslog_id: str):
"""Detect cluster split-brain and node disconnect."""
msg_lower = msg.lower()
@@ -955,7 +1053,8 @@ class TaskWatcher:
parts = line.strip().split(':')
if len(parts) >= 3:
try:
pid = int(parts[2])
# PID in UPID is HEXADECIMAL
pid = int(parts[2], 16)
os.kill(pid, 0) # Signal 0 = just check existence
self._vzdump_active_cache = now
return True
@@ -999,24 +1098,22 @@ class TaskWatcher:
time.sleep(2) # Check every 2 seconds
def _check_active_tasks(self):
"""Scan /var/log/pve/tasks/active for newly started vzdump tasks.
"""Scan /var/log/pve/tasks/active to track vzdump for VM suppression.
The 'active' file lists UPIDs of currently running PVE tasks.
Format: UPID:node:pid:pstart:starttime:type:id:user:
This does NOT emit backup_start notifications (the JournalWatcher
handles that via the 'starting new backup job' log message, which
contains the full guest list and parameters).
For multi-VM backups (`vzdump 100 101`), PVE creates:
- A main UPID with EMPTY vmid: UPID:amd:PID:...:vzdump::root@pam:
- Per-VM sub-UPIDs: UPID:amd:PID:...:vzdump:101:root@pam:
We only emit backup_start for the MAIN job (empty vmid) and read
/proc/PID/cmdline to discover which VMs are being backed up.
Per-VM sub-UPIDs are tracked but don't trigger a notification.
This only keeps _vzdump_running_since updated so that
_is_vzdump_active() can suppress VM start/stop notifications
during backup operations.
"""
if not os.path.exists(self.TASK_ACTIVE):
return
try:
current_upids = set()
found_vzdump = False
with open(self.TASK_ACTIVE, 'r') as f:
for line in f:
line = line.strip()
@@ -1025,133 +1122,22 @@ class TaskWatcher:
upid = line.split()[0] if line.split() else line
current_upids.add(upid)
if ':vzdump:' not in upid:
continue
if upid in self._seen_active_upids:
continue
self._seen_active_upids.add(upid)
# Parse UPID: UPID:node:pid:pstart:starttime:type:id:user:
upid_parts = upid.split(':')
if len(upid_parts) < 8:
continue
vmid = upid_parts[6] # Empty for main job, vmid for sub-task
user = upid_parts[7]
pid_hex = upid_parts[2]
# Track vzdump internally for VM suppression
self._vzdump_running_since = time.time()
# Only emit notification for the MAIN job (empty vmid).
# Per-VM sub-tasks are tracked for suppression but don't notify
# (otherwise you'd get N+1 notifications for N VMs).
if vmid:
continue
# Read /proc/PID/cmdline to discover which VMs and settings
backup_info = self._parse_vzdump_cmdline(pid_hex)
# Build the notification body
reason_parts = []
if backup_info.get('guests'):
guest_lines = []
for gid in backup_info['guests']:
gname = self._get_vm_name(gid)
if gname:
guest_lines.append(f' {gname} ({gid})')
else:
guest_lines.append(f' ID {gid}')
reason_parts.append('Guests:\n' + '\n'.join(guest_lines))
details = []
if backup_info.get('storage'):
details.append(f'Storage: {backup_info["storage"]}')
if backup_info.get('mode'):
details.append(f'Mode: {backup_info["mode"]}')
if backup_info.get('compress'):
details.append(f'Compression: {backup_info["compress"]}')
if details:
reason_parts.append(' | '.join(details))
if not reason_parts:
reason_parts.append('Backup job started')
reason = '\n'.join(reason_parts)
data = {
'vmid': ', '.join(backup_info.get('guests', [])),
'vmname': '',
'hostname': self._hostname,
'user': user,
'reason': reason,
'target_node': '',
'size': '',
'snapshot_name': '',
}
self._queue.put(NotificationEvent(
'backup_start', 'INFO', data,
source='tasks',
entity='backup',
entity_id='vzdump',
))
if ':vzdump:' in upid:
found_vzdump = True
# Keep _vzdump_running_since fresh as long as vzdump is in active
if found_vzdump:
self._vzdump_running_since = time.time()
# Cleanup stale UPIDs
stale = self._seen_active_upids - current_upids
self._seen_active_upids -= stale
# Track new ones
self._seen_active_upids |= current_upids
except Exception as e:
print(f"[TaskWatcher] Error reading active tasks: {e}")
@staticmethod
def _parse_vzdump_cmdline(pid_hex: str) -> dict:
"""Read /proc/PID/cmdline to extract vzdump parameters.
Returns dict with keys: guests (list), storage, mode, compress, all.
"""
info: dict = {'guests': [], 'storage': '', 'mode': '', 'compress': ''}
try:
pid = int(pid_hex, 16)
cmdline_path = f'/proc/{pid}/cmdline'
if not os.path.exists(cmdline_path):
return info
with open(cmdline_path, 'rb') as f:
raw = f.read()
# cmdline is null-byte separated
args = raw.decode('utf-8', errors='replace').split('\0')
args = [a for a in args if a] # remove empty
# Parse: vzdump VMID1 VMID2 --storage local --mode stop ...
i = 0
while i < len(args):
arg = args[i]
if arg.isdigit():
info['guests'].append(arg)
elif arg == '--storage' and i + 1 < len(args):
info['storage'] = args[i + 1]
i += 1
elif arg == '--mode' and i + 1 < len(args):
info['mode'] = args[i + 1]
i += 1
elif arg == '--compress' and i + 1 < len(args):
info['compress'] = args[i + 1]
i += 1
elif arg == '--all' and i + 1 < len(args):
if args[i + 1] == '1':
info['guests'] = ['all']
i += 1
i += 1
return info
except Exception:
return info
def _process_task_line(self, line: str):
"""Process a single task index line.
@@ -1431,8 +1417,21 @@ class PollingCollector:
if severity in ('INFO', 'OK'):
continue
# Skip dismissed/acknowledged errors -- the user already handled these
if error.get('acknowledged') == 1:
continue
# On first poll, seed _last_notified for all existing errors so we
# don't re-notify old persistent errors that were already sent before
# a service restart. Only genuinely NEW errors (appearing after the
# first poll) will trigger immediate notifications.
if not self._first_poll_done:
if error_key not in self._last_notified:
self._last_notified[error_key] = now
continue
# Determine if we should notify
is_new = error_key not in self._known_errors and self._first_poll_done
is_new = error_key not in self._known_errors
last_sent = self._last_notified.get(error_key, 0)
is_due = (now - last_sent) >= self.DIGEST_INTERVAL

View File

@@ -651,7 +651,8 @@ class NotificationManager:
parts = line.strip().split(':')
if len(parts) >= 3:
try:
pid = int(parts[2])
# PID in UPID is HEXADECIMAL
pid = int(parts[2], 16)
os.kill(pid, 0)
return True
except (ValueError, ProcessLookupError, PermissionError):