mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-04-25 08:56:21 +00:00
Update notification service
This commit is contained in:
@@ -23,6 +23,7 @@ import time
|
||||
import threading
|
||||
import urllib.parse
|
||||
import hardware_monitor
|
||||
import health_persistence
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import datetime, timedelta
|
||||
from functools import wraps
|
||||
@@ -1159,19 +1160,66 @@ def get_storage_info():
|
||||
'ssd_life_left': smart_data.get('ssd_life_left') # Added
|
||||
}
|
||||
|
||||
storage_data['disk_count'] += 1
|
||||
health = smart_data.get('health', 'unknown').lower()
|
||||
if health == 'healthy':
|
||||
storage_data['healthy_disks'] += 1
|
||||
elif health == 'warning':
|
||||
storage_data['warning_disks'] += 1
|
||||
elif health in ['critical', 'failed']:
|
||||
storage_data['critical_disks'] += 1
|
||||
|
||||
except Exception as e:
|
||||
# print(f"Error getting disk list: {e}")
|
||||
pass
|
||||
|
||||
# Enrich physical disks with active I/O errors from health_persistence.
|
||||
# This is the single source of truth -- health_monitor detects ATA/SCSI/IO
|
||||
# errors via dmesg, records them in health_persistence, and we read them here.
|
||||
try:
|
||||
active_disk_errors = health_persistence.get_active_errors(category='disks')
|
||||
for err in active_disk_errors:
|
||||
details = err.get('details', {})
|
||||
if isinstance(details, str):
|
||||
try:
|
||||
details = json.loads(details)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
details = {}
|
||||
|
||||
err_device = details.get('disk', '')
|
||||
error_count = details.get('error_count', 0)
|
||||
sample = details.get('sample', '')
|
||||
severity = err.get('severity', 'WARNING')
|
||||
|
||||
# Match error to physical disk.
|
||||
# err_device can be 'sda', 'nvme0n1', or 'ata8' (if resolution failed)
|
||||
matched_disk = None
|
||||
if err_device in physical_disks:
|
||||
matched_disk = err_device
|
||||
else:
|
||||
# Try partial match: 'sda' matches disk 'sda'
|
||||
for dk in physical_disks:
|
||||
if dk == err_device or err_device.startswith(dk):
|
||||
matched_disk = dk
|
||||
break
|
||||
|
||||
if matched_disk:
|
||||
physical_disks[matched_disk]['io_errors'] = {
|
||||
'count': error_count,
|
||||
'severity': severity,
|
||||
'sample': sample,
|
||||
'reason': err.get('reason', ''),
|
||||
}
|
||||
# Override health status if I/O errors are more severe
|
||||
current_health = physical_disks[matched_disk].get('health', 'unknown').lower()
|
||||
if severity == 'CRITICAL' and current_health != 'critical':
|
||||
physical_disks[matched_disk]['health'] = 'critical'
|
||||
elif severity == 'WARNING' and current_health in ('healthy', 'unknown'):
|
||||
physical_disks[matched_disk]['health'] = 'warning'
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Count disk health states AFTER I/O error enrichment
|
||||
for disk_name, disk_info in physical_disks.items():
|
||||
storage_data['disk_count'] += 1
|
||||
health = disk_info.get('health', 'unknown').lower()
|
||||
if health == 'healthy':
|
||||
storage_data['healthy_disks'] += 1
|
||||
elif health == 'warning':
|
||||
storage_data['warning_disks'] += 1
|
||||
elif health in ['critical', 'failed']:
|
||||
storage_data['critical_disks'] += 1
|
||||
|
||||
storage_data['total'] = round(total_disk_size_bytes / (1024**4), 1)
|
||||
|
||||
# Get disk usage for mounted partitions
|
||||
|
||||
@@ -324,6 +324,12 @@ class HealthMonitor:
|
||||
Returns JSON structure with ALL 10 categories always present.
|
||||
Now includes persistent error tracking.
|
||||
"""
|
||||
# Run cleanup on every status check to auto-resolve stale errors
|
||||
try:
|
||||
health_persistence.cleanup_old_errors()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
active_errors = health_persistence.get_active_errors()
|
||||
# No need to create persistent_issues dict here, it's implicitly handled by the checks
|
||||
|
||||
@@ -1087,16 +1093,67 @@ class HealthMonitor:
|
||||
|
||||
return storages
|
||||
|
||||
def _resolve_ata_to_disk(self, ata_port: str) -> str:
|
||||
"""Resolve an ATA controller name (e.g. 'ata8') to a block device (e.g. 'sda').
|
||||
|
||||
Uses /sys/class/ata_port/ symlinks and /sys/block/ to find the mapping.
|
||||
Falls back to parsing dmesg for 'ata8: SATA link up' -> 'sd 7:0:0:0: [sda]'.
|
||||
"""
|
||||
if not ata_port or not ata_port.startswith('ata'):
|
||||
return ata_port
|
||||
|
||||
port_num = ata_port.replace('ata', '')
|
||||
|
||||
# Method 1: Walk /sys/class/ata_port/ -> host -> target -> block
|
||||
try:
|
||||
ata_path = f'/sys/class/ata_port/{ata_port}'
|
||||
if os.path.exists(ata_path):
|
||||
device_path = os.path.realpath(ata_path)
|
||||
# Walk up to find the SCSI host, then find block devices
|
||||
# Path: /sys/devices/.../ataX/hostY/targetY:0:0/Y:0:0:0/block/sdZ
|
||||
for root, dirs, files in os.walk(os.path.dirname(device_path)):
|
||||
if 'block' in dirs:
|
||||
block_path = os.path.join(root, 'block')
|
||||
devs = os.listdir(block_path)
|
||||
if devs:
|
||||
return devs[0] # e.g. 'sda'
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
|
||||
# Method 2: Parse dmesg for ATA link messages
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['dmesg', '--notime'],
|
||||
capture_output=True, text=True, timeout=2
|
||||
)
|
||||
if result.returncode == 0:
|
||||
# Look for "ata8: SATA link up" followed by "sd X:0:0:0: [sda]"
|
||||
lines = result.stdout.split('\n')
|
||||
host_num = None
|
||||
for line in lines:
|
||||
m = re.search(rf'{ata_port}:\s+SATA link', line)
|
||||
if m:
|
||||
# ata port number maps to host(N-1) typically
|
||||
host_num = int(port_num) - 1
|
||||
if host_num is not None:
|
||||
m2 = re.search(rf'sd\s+{host_num}:\d+:\d+:\d+:\s+\[(\w+)\]', line)
|
||||
if m2:
|
||||
return m2.group(1)
|
||||
except (OSError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
|
||||
return ata_port # Return original if resolution fails
|
||||
|
||||
def _check_disks_optimized(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Optimized disk check - always returns status.
|
||||
Checks dmesg for I/O errors and SMART status.
|
||||
NOTE: This function is now largely covered by _check_storage_optimized,
|
||||
but kept for potential specific disk-level reporting if needed.
|
||||
Currently, its primary function is to detect recent I/O errors.
|
||||
Disk I/O error check -- the SINGLE source of truth for disk errors.
|
||||
|
||||
Reads dmesg for I/O/ATA/SCSI errors, counts per device, records in
|
||||
health_persistence, and returns status for the health dashboard.
|
||||
Resolves ATA controller names (ata8) to physical disks (sda).
|
||||
"""
|
||||
current_time = time.time()
|
||||
disk_issues = {}
|
||||
disk_results = {} # Single dict for both WARNING and CRITICAL
|
||||
|
||||
try:
|
||||
# Check dmesg for I/O errors in the last 5 minutes
|
||||
@@ -1107,17 +1164,52 @@ class HealthMonitor:
|
||||
timeout=2
|
||||
)
|
||||
|
||||
# Collect a sample line per device for richer error messages
|
||||
disk_samples = {}
|
||||
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.split('\n'):
|
||||
line_lower = line.lower()
|
||||
if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error', 'medium error']):
|
||||
# Try to extract disk name
|
||||
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+)', line)
|
||||
if disk_match:
|
||||
disk_name = disk_match.group(1)
|
||||
# Detect various disk error formats
|
||||
is_disk_error = any(kw in line_lower for kw in [
|
||||
'i/o error', 'scsi error', 'medium error',
|
||||
'failed command:', 'exception emask',
|
||||
])
|
||||
ata_match = re.search(r'(ata\d+)[\.\d]*:.*(?:error|failed|exception)', line_lower)
|
||||
if ata_match:
|
||||
is_disk_error = True
|
||||
|
||||
if is_disk_error:
|
||||
# Extract device from multiple formats
|
||||
raw_device = None
|
||||
for dev_re in [
|
||||
r'dev\s+(sd[a-z]+)', # dev sdb
|
||||
r'\[(sd[a-z]+)\]', # [sda]
|
||||
r'/dev/(sd[a-z]+)', # /dev/sda
|
||||
r'(nvme\d+n\d+)', # nvme0n1
|
||||
r'device\s+(sd[a-z]+\d*)', # device sda1
|
||||
r'(ata\d+)', # ata8 (ATA controller)
|
||||
]:
|
||||
dm = re.search(dev_re, line)
|
||||
if dm:
|
||||
raw_device = dm.group(1)
|
||||
break
|
||||
|
||||
if raw_device:
|
||||
# Resolve ATA port to physical disk name
|
||||
if raw_device.startswith('ata'):
|
||||
resolved = self._resolve_ata_to_disk(raw_device)
|
||||
disk_name = resolved
|
||||
else:
|
||||
disk_name = raw_device.rstrip('0123456789') if raw_device.startswith('sd') else raw_device
|
||||
|
||||
self.io_error_history[disk_name].append(current_time)
|
||||
if disk_name not in disk_samples:
|
||||
# Clean the sample: strip dmesg timestamp prefix
|
||||
clean = re.sub(r'^\[.*?\]\s*', '', line.strip())
|
||||
disk_samples[disk_name] = clean[:200]
|
||||
|
||||
# Clean old history (keep errors from the last 5 minutes)
|
||||
# Clean old history and evaluate per-disk status
|
||||
for disk in list(self.io_error_history.keys()):
|
||||
self.io_error_history[disk] = [
|
||||
t for t in self.io_error_history[disk]
|
||||
@@ -1125,57 +1217,66 @@ class HealthMonitor:
|
||||
]
|
||||
|
||||
error_count = len(self.io_error_history[disk])
|
||||
error_key = f'disk_{disk}'
|
||||
sample = disk_samples.get(disk, '')
|
||||
display = f'/dev/{disk}' if not disk.startswith('/') else disk
|
||||
|
||||
# Report based on recent error count
|
||||
if error_count >= 3:
|
||||
error_key = f'disk_{disk}'
|
||||
severity = 'CRITICAL'
|
||||
reason = f'{error_count} I/O errors in 5 minutes'
|
||||
reason = f'{display}: {error_count} I/O errors in 5 min'
|
||||
if sample:
|
||||
reason += f'\n{sample}'
|
||||
|
||||
health_persistence.record_error(
|
||||
error_key=error_key,
|
||||
category='disks',
|
||||
severity=severity,
|
||||
reason=reason,
|
||||
details={'disk': disk, 'error_count': error_count, 'dismissable': False}
|
||||
details={'disk': disk, 'device': display,
|
||||
'error_count': error_count,
|
||||
'sample': sample, 'dismissable': False}
|
||||
)
|
||||
|
||||
disk_details[disk] = {
|
||||
disk_results[display] = {
|
||||
'status': severity,
|
||||
'reason': reason,
|
||||
'dismissable': False
|
||||
'device': disk,
|
||||
'error_count': error_count,
|
||||
'dismissable': False,
|
||||
}
|
||||
elif error_count >= 1:
|
||||
error_key = f'disk_{disk}'
|
||||
severity = 'WARNING'
|
||||
reason = f'{error_count} I/O error(s) in 5 minutes'
|
||||
reason = f'{display}: {error_count} I/O error(s) in 5 min'
|
||||
if sample:
|
||||
reason += f'\n{sample}'
|
||||
|
||||
health_persistence.record_error(
|
||||
error_key=error_key,
|
||||
category='disks',
|
||||
severity=severity,
|
||||
reason=reason,
|
||||
details={'disk': disk, 'error_count': error_count, 'dismissable': True}
|
||||
details={'disk': disk, 'device': display,
|
||||
'error_count': error_count,
|
||||
'sample': sample, 'dismissable': True}
|
||||
)
|
||||
|
||||
disk_issues[f'/dev/{disk}'] = {
|
||||
disk_results[display] = {
|
||||
'status': severity,
|
||||
'reason': reason,
|
||||
'dismissable': True
|
||||
'device': disk,
|
||||
'error_count': error_count,
|
||||
'dismissable': True,
|
||||
}
|
||||
else:
|
||||
error_key = f'disk_{disk}'
|
||||
health_persistence.resolve_error(error_key, 'Disk errors cleared')
|
||||
|
||||
if not disk_issues:
|
||||
if not disk_results:
|
||||
return {'status': 'OK'}
|
||||
|
||||
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_issues.values())
|
||||
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_results.values())
|
||||
|
||||
return {
|
||||
'status': 'CRITICAL' if has_critical else 'WARNING',
|
||||
'reason': f"{len(disk_issues)} disk(s) with recent errors",
|
||||
'details': disk_issues
|
||||
'reason': f"{len(disk_results)} disk(s) with recent errors",
|
||||
'details': disk_results
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
@@ -1418,6 +1519,19 @@ class HealthMonitor:
|
||||
Improved detection of container and VM errors from journalctl.
|
||||
"""
|
||||
try:
|
||||
# First: auto-resolve any persisted VM/CT errors where the guest
|
||||
# is now running. This clears stale "Failed to start" / QMP
|
||||
# errors that are no longer relevant.
|
||||
try:
|
||||
active_vm_errors = health_persistence.get_active_errors('vms')
|
||||
for err in active_vm_errors:
|
||||
details = err.get('details') or {}
|
||||
vmid = details.get('id', '')
|
||||
if vmid:
|
||||
health_persistence.check_vm_running(vmid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
issues = []
|
||||
vm_details = {}
|
||||
|
||||
@@ -1470,11 +1584,15 @@ class HealthMonitor:
|
||||
else:
|
||||
reason = 'Container error'
|
||||
|
||||
issues.append(f'CT {ctid}: {reason}')
|
||||
ct_name = self._resolve_vm_name(ctid)
|
||||
display = f"CT {ctid} ({ct_name})" if ct_name else f"CT {ctid}"
|
||||
full_reason = f'{display}: {reason}\n{line.strip()[:200]}'
|
||||
issues.append(f'{display}: {reason}')
|
||||
vm_details[key] = {
|
||||
'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL',
|
||||
'reason': reason,
|
||||
'reason': full_reason,
|
||||
'id': ctid,
|
||||
'vmname': ct_name,
|
||||
'type': 'CT'
|
||||
}
|
||||
continue
|
||||
@@ -1509,11 +1627,15 @@ class HealthMonitor:
|
||||
vmid = id_match.group(1)
|
||||
key = f'vmct_{vmid}'
|
||||
if key not in vm_details:
|
||||
issues.append(f'VM/CT {vmid}: Failed to start')
|
||||
vm_name = self._resolve_vm_name(vmid)
|
||||
display = f"VM/CT {vmid} ({vm_name})" if vm_name else f"VM/CT {vmid}"
|
||||
full_reason = f'{display}: Failed to start\n{line.strip()[:200]}'
|
||||
issues.append(f'{display}: Failed to start')
|
||||
vm_details[key] = {
|
||||
'status': 'CRITICAL',
|
||||
'reason': 'Failed to start',
|
||||
'reason': full_reason,
|
||||
'id': vmid,
|
||||
'vmname': vm_name,
|
||||
'type': 'VM/CT'
|
||||
}
|
||||
|
||||
@@ -1661,20 +1783,25 @@ class HealthMonitor:
|
||||
vm_type = 'VM/CT'
|
||||
|
||||
if error_key not in vm_details:
|
||||
reason = 'Failed to start'
|
||||
vm_name = self._resolve_vm_name(vmid_ctid)
|
||||
display = f"{vm_type} {vmid_ctid}"
|
||||
if vm_name:
|
||||
display = f"{vm_type} {vmid_ctid} ({vm_name})"
|
||||
reason = f'{display}: Failed to start\n{line.strip()[:200]}'
|
||||
# Record persistent error
|
||||
health_persistence.record_error(
|
||||
error_key=error_key,
|
||||
category='vms',
|
||||
severity='CRITICAL',
|
||||
reason=reason,
|
||||
details={'id': vmid_ctid, 'type': vm_type}
|
||||
details={'id': vmid_ctid, 'vmname': vm_name, 'type': vm_type}
|
||||
)
|
||||
issues.append(f'{vm_type} {vmid_ctid}: {reason}')
|
||||
issues.append(f'{display}: Failed to start')
|
||||
vm_details[error_key] = {
|
||||
'status': 'CRITICAL',
|
||||
'reason': reason,
|
||||
'id': vmid_ctid,
|
||||
'vmname': vm_name,
|
||||
'type': vm_type
|
||||
}
|
||||
|
||||
@@ -1979,6 +2106,18 @@ class HealthMonitor:
|
||||
if recent_count >= 5 and recent_count >= prev_count * 4:
|
||||
spike_errors[pattern] = recent_count
|
||||
|
||||
# Helper: get human-readable samples from normalized patterns
|
||||
def _get_samples(error_dict, max_items=3):
|
||||
"""Return list of readable sample lines for error patterns."""
|
||||
samples = []
|
||||
for pattern in list(error_dict.keys())[:max_items]:
|
||||
pdata = self.persistent_log_patterns.get(pattern, {})
|
||||
sample = pdata.get('sample', pattern)
|
||||
# Trim timestamp prefix if present (e.g. "Feb 27 16:03:35 host ")
|
||||
clean = re.sub(r'^[A-Z][a-z]{2}\s+\d+\s+[\d:]+\s+\S+\s+', '', sample)
|
||||
samples.append(clean[:120])
|
||||
return samples
|
||||
|
||||
persistent_errors = {}
|
||||
for pattern, data in self.persistent_log_patterns.items():
|
||||
time_span = current_time - data['first_seen']
|
||||
@@ -2018,31 +2157,38 @@ class HealthMonitor:
|
||||
# Get a representative critical error reason
|
||||
representative_error = next(iter(critical_errors_found.values()))
|
||||
reason = f'Critical error detected: {representative_error[:100]}'
|
||||
elif cascade_count > 0:
|
||||
status = 'WARNING'
|
||||
reason = f'Error cascade detected: {cascade_count} pattern(s) repeating ≥15 times in 3min'
|
||||
elif spike_count > 0:
|
||||
status = 'WARNING'
|
||||
reason = f'Error spike detected: {spike_count} pattern(s) increased 4x'
|
||||
elif persistent_count > 0:
|
||||
status = 'WARNING'
|
||||
reason = f'Persistent errors: {persistent_count} pattern(s) recurring over 15+ minutes'
|
||||
elif cascade_count > 0:
|
||||
status = 'WARNING'
|
||||
samples = _get_samples(cascading_errors, 3)
|
||||
reason = f'Error cascade ({cascade_count} patterns repeating):\n' + '\n'.join(f' - {s}' for s in samples)
|
||||
elif spike_count > 0:
|
||||
status = 'WARNING'
|
||||
samples = _get_samples(spike_errors, 3)
|
||||
reason = f'Error spike ({spike_count} patterns with 4x increase):\n' + '\n'.join(f' - {s}' for s in samples)
|
||||
elif persistent_count > 0:
|
||||
status = 'WARNING'
|
||||
samples = _get_samples(persistent_errors, 3)
|
||||
reason = f'Persistent errors ({persistent_count} patterns over 15+ min):\n' + '\n'.join(f' - {s}' for s in samples)
|
||||
else:
|
||||
# No significant issues found
|
||||
status = 'OK'
|
||||
reason = None
|
||||
|
||||
# Record/clear persistent errors for each log sub-check so Dismiss works
|
||||
log_sub_checks = {
|
||||
'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{cascade_count} pattern(s) repeating >=15 times'},
|
||||
'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{spike_count} pattern(s) with 4x increase'},
|
||||
'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{persistent_count} recurring pattern(s) over 15+ min'},
|
||||
'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL',
|
||||
'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False},
|
||||
}
|
||||
cascade_samples = _get_samples(cascading_errors, 2) if cascade_count else []
|
||||
spike_samples = _get_samples(spike_errors, 2) if spike_count else []
|
||||
persist_samples = _get_samples(persistent_errors, 2) if persistent_count else []
|
||||
|
||||
log_sub_checks = {
|
||||
'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{cascade_count} pattern(s) repeating >=15 times:\n' + '\n'.join(f' - {s}' for s in cascade_samples) if cascade_count else ''},
|
||||
'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{spike_count} pattern(s) with 4x increase:\n' + '\n'.join(f' - {s}' for s in spike_samples) if spike_count else ''},
|
||||
'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING',
|
||||
'reason': f'{persistent_count} recurring pattern(s) over 15+ min:\n' + '\n'.join(f' - {s}' for s in persist_samples) if persistent_count else ''},
|
||||
'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL',
|
||||
'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False},
|
||||
}
|
||||
|
||||
# Track which sub-checks were dismissed
|
||||
dismissed_keys = set()
|
||||
|
||||
@@ -25,12 +25,8 @@ from pathlib import Path
|
||||
class HealthPersistence:
|
||||
"""Manages persistent health error tracking"""
|
||||
|
||||
# Error retention periods (seconds)
|
||||
VM_ERROR_RETENTION = 48 * 3600 # 48 hours
|
||||
LOG_ERROR_RETENTION = 24 * 3600 # 24 hours
|
||||
DISK_ERROR_RETENTION = 48 * 3600 # 48 hours
|
||||
|
||||
# Default suppression: 24 hours (user can change per-category in settings)
|
||||
# Default suppression duration when no user setting exists for a category.
|
||||
# Users can override per-category via the Suppression Duration settings.
|
||||
DEFAULT_SUPPRESSION_HOURS = 24
|
||||
|
||||
# Mapping from error categories to settings keys
|
||||
@@ -496,32 +492,51 @@ class HealthPersistence:
|
||||
cursor = conn.cursor()
|
||||
|
||||
now = datetime.now()
|
||||
now_iso = now.isoformat()
|
||||
|
||||
# Delete resolved errors older than 7 days
|
||||
cutoff_resolved = (now - timedelta(days=7)).isoformat()
|
||||
cursor.execute('DELETE FROM errors WHERE resolved_at < ?', (cutoff_resolved,))
|
||||
|
||||
# Auto-resolve VM/CT errors older than 48h
|
||||
cutoff_vm = (now - timedelta(seconds=self.VM_ERROR_RETENTION)).isoformat()
|
||||
cursor.execute('''
|
||||
UPDATE errors
|
||||
SET resolved_at = ?
|
||||
WHERE category = 'vms'
|
||||
AND resolved_at IS NULL
|
||||
AND first_seen < ?
|
||||
AND acknowledged = 0
|
||||
''', (now.isoformat(), cutoff_vm))
|
||||
# ── Auto-resolve stale errors using user-configured Suppression Duration ──
|
||||
# Read the per-category suppression hours from user_settings.
|
||||
# If the user hasn't configured a category, fall back to DEFAULT_SUPPRESSION_HOURS.
|
||||
# This is the SINGLE source of truth for auto-resolution timing.
|
||||
user_settings = {}
|
||||
try:
|
||||
cursor.execute('SELECT setting_key, setting_value FROM user_settings WHERE setting_key LIKE ?', ('suppress_%',))
|
||||
for row in cursor.fetchall():
|
||||
user_settings[row[0]] = row[1]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Auto-resolve log errors older than 24h
|
||||
cutoff_logs = (now - timedelta(seconds=self.LOG_ERROR_RETENTION)).isoformat()
|
||||
for category, setting_key in self.CATEGORY_SETTING_MAP.items():
|
||||
stored = user_settings.get(setting_key)
|
||||
try:
|
||||
hours = int(stored) if stored else self.DEFAULT_SUPPRESSION_HOURS
|
||||
except (ValueError, TypeError):
|
||||
hours = self.DEFAULT_SUPPRESSION_HOURS
|
||||
|
||||
cutoff = (now - timedelta(hours=hours)).isoformat()
|
||||
cursor.execute('''
|
||||
UPDATE errors
|
||||
SET resolved_at = ?
|
||||
WHERE category = ?
|
||||
AND resolved_at IS NULL
|
||||
AND last_seen < ?
|
||||
AND acknowledged = 0
|
||||
''', (now_iso, category, cutoff))
|
||||
|
||||
# Catch-all: auto-resolve ANY error from an unmapped category
|
||||
# whose last_seen exceeds DEFAULT_SUPPRESSION_HOURS.
|
||||
fallback_cutoff = (now - timedelta(hours=self.DEFAULT_SUPPRESSION_HOURS)).isoformat()
|
||||
cursor.execute('''
|
||||
UPDATE errors
|
||||
UPDATE errors
|
||||
SET resolved_at = ?
|
||||
WHERE category = 'logs'
|
||||
AND resolved_at IS NULL
|
||||
AND first_seen < ?
|
||||
WHERE resolved_at IS NULL
|
||||
AND acknowledged = 0
|
||||
''', (now.isoformat(), cutoff_logs))
|
||||
AND last_seen < ?
|
||||
''', (now_iso, fallback_cutoff))
|
||||
|
||||
# Delete old events (>30 days)
|
||||
cutoff_events = (now - timedelta(days=30)).isoformat()
|
||||
|
||||
@@ -266,17 +266,24 @@ class JournalWatcher:
|
||||
if re.search(noise, msg, re.IGNORECASE):
|
||||
return
|
||||
|
||||
# NOTE: Disk I/O errors (ATA, SCSI, blk_update_request) are NOT handled
|
||||
# here. They are detected exclusively by HealthMonitor._check_disks_optimized
|
||||
# which records to health_persistence -> PollingCollector -> notification.
|
||||
# This avoids duplicate notifications and ensures the health dashboard
|
||||
# stays in sync with notifications.
|
||||
# Filesystem errors (EXT4/BTRFS/XFS/ZFS) ARE handled here because they
|
||||
# indicate corruption, not just hardware I/O problems.
|
||||
|
||||
critical_patterns = {
|
||||
r'kernel panic': ('system_problem', 'CRITICAL', 'Kernel panic'),
|
||||
r'Out of memory': ('system_problem', 'CRITICAL', 'Out of memory killer activated'),
|
||||
r'segfault': ('system_problem', 'WARNING', 'Segmentation fault detected'),
|
||||
r'BUG:': ('system_problem', 'CRITICAL', 'Kernel BUG detected'),
|
||||
r'Call Trace:': ('system_problem', 'WARNING', 'Kernel call trace'),
|
||||
r'I/O error.*dev\s+(\S+)': ('disk_io_error', 'CRITICAL', 'Disk I/O error'),
|
||||
r'EXT4-fs error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
||||
r'BTRFS error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
||||
r'XFS.*error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
||||
r'ZFS.*error': ('disk_io_error', 'CRITICAL', 'ZFS pool error'),
|
||||
r'EXT4-fs error': ('system_problem', 'CRITICAL', 'Filesystem error'),
|
||||
r'BTRFS error': ('system_problem', 'CRITICAL', 'Filesystem error'),
|
||||
r'XFS.*error': ('system_problem', 'CRITICAL', 'Filesystem error'),
|
||||
r'ZFS.*error': ('system_problem', 'CRITICAL', 'ZFS pool error'),
|
||||
r'mce:.*Hardware Error': ('system_problem', 'CRITICAL', 'Hardware error (MCE)'),
|
||||
}
|
||||
|
||||
@@ -286,11 +293,9 @@ class JournalWatcher:
|
||||
entity_id = ''
|
||||
|
||||
# Build a context-rich reason from the journal message.
|
||||
# The raw msg contains process name, PID, addresses, library, etc.
|
||||
enriched = reason
|
||||
|
||||
if 'segfault' in pattern:
|
||||
# Kernel segfault: "process[PID]: segfault at ADDR ... in lib.so"
|
||||
m = re.search(r'(\S+)\[(\d+)\].*segfault', msg)
|
||||
proc_name = m.group(1) if m else ''
|
||||
proc_pid = m.group(2) if m else ''
|
||||
@@ -305,30 +310,17 @@ class JournalWatcher:
|
||||
enriched = '\n'.join(parts)
|
||||
|
||||
elif 'Out of memory' in pattern:
|
||||
# OOM: "Out of memory: Killed process PID (name)"
|
||||
m = re.search(r'Killed process\s+(\d+)\s+\(([^)]+)\)', msg)
|
||||
if m:
|
||||
enriched = f"{reason}\nKilled: {m.group(2)} (PID {m.group(1)})"
|
||||
else:
|
||||
enriched = f"{reason}\n{msg[:300]}"
|
||||
|
||||
elif event_type == 'disk_io_error':
|
||||
# Include device and raw message for disk/fs errors
|
||||
dev_match = re.search(r'dev\s+(\S+)', msg)
|
||||
if dev_match:
|
||||
entity = 'disk'
|
||||
entity_id = dev_match.group(1)
|
||||
enriched = f"{reason}\nDevice: {dev_match.group(1)}"
|
||||
else:
|
||||
enriched = f"{reason}\n{msg[:300]}"
|
||||
|
||||
else:
|
||||
# Generic: include the raw journal message for context
|
||||
enriched = f"{reason}\n{msg[:300]}"
|
||||
|
||||
data = {'reason': enriched, 'hostname': self._hostname}
|
||||
if entity == 'disk':
|
||||
data['device'] = entity_id
|
||||
|
||||
self._emit(event_type, severity, data, entity=entity, entity_id=entity_id)
|
||||
return
|
||||
@@ -466,19 +458,65 @@ class JournalWatcher:
|
||||
}, entity='cluster', entity_id=node_name)
|
||||
|
||||
def _check_system_shutdown(self, msg: str, syslog_id: str):
|
||||
"""Detect system shutdown/reboot."""
|
||||
if 'systemd-journald' in syslog_id or 'systemd' in syslog_id:
|
||||
if 'Journal stopped' in msg or 'Stopping Journal Service' in msg:
|
||||
self._emit('system_shutdown', 'WARNING', {
|
||||
'reason': 'System journal stopped',
|
||||
'hostname': self._hostname,
|
||||
}, entity='node', entity_id='')
|
||||
elif 'Shutting down' in msg or 'System is rebooting' in msg:
|
||||
event = 'system_reboot' if 'reboot' in msg.lower() else 'system_shutdown'
|
||||
self._emit(event, 'WARNING', {
|
||||
'reason': msg[:200],
|
||||
'hostname': self._hostname,
|
||||
}, entity='node', entity_id='')
|
||||
"""Detect system shutdown/reboot.
|
||||
|
||||
Matches multiple systemd signals that indicate the node is going down:
|
||||
- "Shutting down." (systemd PID 1)
|
||||
- "System is powering off." / "System is rebooting."
|
||||
- "Reached target Shutdown." / "Reached target Reboot."
|
||||
- "Journal stopped" (very late in shutdown)
|
||||
- "The system will reboot now!" / "The system will power off now!"
|
||||
"""
|
||||
msg_lower = msg.lower()
|
||||
|
||||
# Only process systemd / logind messages
|
||||
if not any(s in syslog_id for s in ('systemd', 'logind', '')):
|
||||
if 'systemd' not in msg_lower:
|
||||
return
|
||||
|
||||
is_reboot = False
|
||||
is_shutdown = False
|
||||
|
||||
# Detect reboot signals
|
||||
reboot_signals = [
|
||||
'system is rebooting',
|
||||
'reached target reboot',
|
||||
'the system will reboot now',
|
||||
'starting reboot',
|
||||
]
|
||||
for sig in reboot_signals:
|
||||
if sig in msg_lower:
|
||||
is_reboot = True
|
||||
break
|
||||
|
||||
# Detect shutdown/poweroff signals
|
||||
if not is_reboot:
|
||||
shutdown_signals = [
|
||||
'system is powering off',
|
||||
'system is halting',
|
||||
'shutting down',
|
||||
'reached target shutdown',
|
||||
'reached target halt',
|
||||
'the system will power off now',
|
||||
'starting power-off',
|
||||
'journal stopped',
|
||||
'stopping journal service',
|
||||
]
|
||||
for sig in shutdown_signals:
|
||||
if sig in msg_lower:
|
||||
is_shutdown = True
|
||||
break
|
||||
|
||||
if is_reboot:
|
||||
self._emit('system_reboot', 'CRITICAL', {
|
||||
'reason': msg[:200],
|
||||
'hostname': self._hostname,
|
||||
}, entity='node', entity_id='')
|
||||
elif is_shutdown:
|
||||
self._emit('system_shutdown', 'CRITICAL', {
|
||||
'reason': msg[:200],
|
||||
'hostname': self._hostname,
|
||||
}, entity='node', entity_id='')
|
||||
|
||||
def _check_permission_change(self, msg: str, syslog_id: str):
|
||||
"""Detect user permission changes in PVE."""
|
||||
@@ -557,7 +595,8 @@ class TaskWatcher:
|
||||
'qmreset': ('vm_restart', 'INFO'),
|
||||
'vzstart': ('ct_start', 'INFO'),
|
||||
'vzstop': ('ct_stop', 'INFO'),
|
||||
'vzshutdown': ('ct_stop', 'INFO'),
|
||||
'vzshutdown': ('ct_shutdown', 'INFO'),
|
||||
'vzreboot': ('ct_restart', 'INFO'),
|
||||
'vzdump': ('backup_start', 'INFO'),
|
||||
'qmsnapshot': ('snapshot_complete', 'INFO'),
|
||||
'vzsnapshot': ('snapshot_complete', 'INFO'),
|
||||
@@ -741,7 +780,7 @@ class TaskWatcher:
|
||||
# These are backup-induced operations (mode=stop), not user actions.
|
||||
# Exception: if a VM/CT FAILS to start after backup, that IS important.
|
||||
_BACKUP_NOISE = {'vm_start', 'vm_stop', 'vm_shutdown', 'vm_restart',
|
||||
'ct_start', 'ct_stop'}
|
||||
'ct_start', 'ct_stop', 'ct_shutdown', 'ct_restart'}
|
||||
if event_type in _BACKUP_NOISE and not is_error:
|
||||
if self._is_vzdump_active():
|
||||
return
|
||||
|
||||
@@ -661,6 +661,23 @@ class NotificationManager:
|
||||
if event.event_type in _ALWAYS_DELIVER and cooldown_str is None:
|
||||
cooldown = 10
|
||||
|
||||
# VM/CT state changes are real user actions that should always be
|
||||
# delivered. Each start/stop/shutdown is a distinct event. A 5s
|
||||
# cooldown prevents exact duplicates from concurrent watchers.
|
||||
_STATE_EVENTS = {
|
||||
'vm_start', 'vm_stop', 'vm_shutdown', 'vm_restart',
|
||||
'ct_start', 'ct_stop', 'ct_shutdown', 'ct_restart',
|
||||
'vm_fail', 'ct_fail',
|
||||
}
|
||||
if event.event_type in _STATE_EVENTS and cooldown_str is None:
|
||||
cooldown = 5
|
||||
|
||||
# System shutdown/reboot must be delivered immediately -- the node
|
||||
# is going down and there may be only seconds to send the message.
|
||||
_URGENT_EVENTS = {'system_shutdown', 'system_reboot'}
|
||||
if event.event_type in _URGENT_EVENTS and cooldown_str is None:
|
||||
cooldown = 5
|
||||
|
||||
# Check against last sent time using stable fingerprint
|
||||
last_sent = self._cooldowns.get(event.fingerprint, 0)
|
||||
|
||||
|
||||
@@ -373,6 +373,18 @@ TEMPLATES = {
|
||||
'group': 'vm_ct',
|
||||
'default_enabled': False,
|
||||
},
|
||||
'ct_shutdown': {
|
||||
'title': '{hostname}: CT {vmid} shutdown',
|
||||
'body': '{vmname} ({vmid}) has been shut down.',
|
||||
'group': 'vm_ct',
|
||||
'default_enabled': False,
|
||||
},
|
||||
'ct_restart': {
|
||||
'title': '{hostname}: CT {vmid} restarted',
|
||||
'body': '{vmname} ({vmid}) has been restarted.',
|
||||
'group': 'vm_ct',
|
||||
'default_enabled': False,
|
||||
},
|
||||
'ct_fail': {
|
||||
'title': '{hostname}: CT {vmid} FAILED',
|
||||
'body': '{vmname} ({vmid}) has failed.\n{reason}',
|
||||
@@ -469,7 +481,7 @@ TEMPLATES = {
|
||||
},
|
||||
'disk_io_error': {
|
||||
'title': '{hostname}: Disk I/O error',
|
||||
'body': 'I/O error detected on {device}.\n{reason}',
|
||||
'body': '{reason}',
|
||||
'group': 'storage',
|
||||
'default_enabled': True,
|
||||
},
|
||||
@@ -730,6 +742,7 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
'security_count': '0', 'total_count': '0', 'package_list': '',
|
||||
'packages': '', 'pve_packages': '', 'version': '',
|
||||
'issue_list': '', 'error_key': '',
|
||||
'storage_name': '', 'storage_type': '',
|
||||
}
|
||||
variables.update(data)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user