Update health monitor

This commit is contained in:
MacRimi
2026-02-16 15:48:41 +01:00
parent dcbc52efc6
commit 2ee5be7402
5 changed files with 1153 additions and 324 deletions

View File

@@ -373,6 +373,44 @@ class HealthMonitor:
overall = 'OK'
summary = 'All systems operational'
# --- Emit events for state changes (Bloque A: Notification prep) ---
try:
previous_overall = getattr(self, '_last_overall_status', None)
if previous_overall and previous_overall != overall:
# Overall status changed - emit event
health_persistence.emit_event(
event_type='state_change',
category='overall',
severity=overall,
data={
'previous': previous_overall,
'current': overall,
'summary': summary
}
)
# Track per-category state changes
previous_details = getattr(self, '_last_category_statuses', {})
for cat_key, cat_data in details.items():
cat_status = cat_data.get('status', 'OK')
prev_status = previous_details.get(cat_key, 'OK')
if prev_status != cat_status and cat_status in ('WARNING', 'CRITICAL'):
health_persistence.emit_event(
event_type='state_change',
category=cat_key,
severity=cat_status,
data={
'previous': prev_status,
'current': cat_status,
'reason': cat_data.get('reason', '')
}
)
self._last_overall_status = overall
self._last_category_statuses = {k: v.get('status', 'OK') for k, v in details.items()}
except Exception:
pass # Event emission should never break health checks
return {
'overall': overall,
'summary': summary,
@@ -445,6 +483,30 @@ class HealthMonitor:
result['status'] = 'WARNING'
result['reason'] = temp_status.get('reason')
# Build checks dict for frontend expandable section
checks = {
'cpu_usage': {
'status': status,
'detail': f'{round(cpu_percent, 1)}% ({psutil.cpu_count()} cores)',
'value': round(cpu_percent, 1),
'thresholds': f'Warning >{self.CPU_WARNING}%, Critical >{self.CPU_CRITICAL}%'
}
}
if temp_status and temp_status.get('status') != 'UNKNOWN':
temp_val = temp_status.get('value', 'N/A')
checks['cpu_temperature'] = {
'status': temp_status.get('status', 'OK'),
'detail': f'{temp_val}°C' if isinstance(temp_val, (int, float)) else str(temp_val),
'value': temp_val,
'thresholds': 'Warning >80°C sustained >3min'
}
else:
checks['cpu_temperature'] = {
'status': 'OK',
'detail': 'Sensor not available',
}
result['checks'] = checks
return result
except Exception as e:
@@ -617,12 +679,35 @@ class HealthMonitor:
status = 'OK'
reason = None
ram_avail_gb = round(memory.available / (1024**3), 2)
ram_total_gb = round(memory.total / (1024**3), 2)
swap_used_gb = round(swap.used / (1024**3), 2)
swap_total_gb = round(swap.total / (1024**3), 2)
# Determine per-sub-check status
ram_status = 'CRITICAL' if mem_percent >= 90 and mem_critical >= 2 else ('WARNING' if mem_percent >= self.MEMORY_WARNING and mem_warning >= 2 else 'OK')
swap_status = 'CRITICAL' if swap_critical >= 2 else 'OK'
result = {
'status': status,
'ram_percent': round(mem_percent, 1),
'ram_available_gb': round(memory.available / (1024**3), 2),
'ram_available_gb': ram_avail_gb,
'swap_percent': round(swap_percent, 1),
'swap_used_gb': round(swap.used / (1024**3), 2)
'swap_used_gb': swap_used_gb,
'checks': {
'ram_usage': {
'status': ram_status,
'detail': f'{round(mem_percent, 1)}% used ({ram_avail_gb} GB free of {ram_total_gb} GB)',
'value': round(mem_percent, 1),
'thresholds': f'Warning >{self.MEMORY_WARNING}%, Critical >90%'
},
'swap_usage': {
'status': swap_status,
'detail': f'{round(swap_percent, 1)}% used ({swap_used_gb} GB of {swap_total_gb} GB)' if swap.total > 0 else 'No swap configured',
'value': round(swap_percent, 1),
'thresholds': 'Critical when swap >20% of RAM'
}
}
}
if reason:
@@ -706,8 +791,28 @@ class HealthMonitor:
issues.append(f"LVM check: {lvm_status.get('reason')}")
storage_details['lvm_check'] = lvm_status
# Check dmesg for real-time I/O errors (dmesg-based, complements journalctl SMART checks)
dmesg_io_result = self._check_disks_optimized()
if dmesg_io_result.get('status') != 'OK':
dmesg_details = dmesg_io_result.get('details', {})
for disk_path, disk_info in dmesg_details.items():
if disk_path not in storage_details or storage_details[disk_path].get('status') == 'OK':
issues.append(f'{disk_path}: {disk_info.get("reason", "I/O errors")}')
storage_details[disk_path] = disk_info
# Build checks dict from storage_details, adding OK entries for items with no issues
checks = {}
for key, val in storage_details.items():
checks[key] = {
'status': val.get('status', 'OK'),
'detail': val.get('reason', 'OK'),
**{k: v for k, v in val.items() if k not in ('status', 'reason')}
}
if not issues:
return {'status': 'OK'}
# Add a summary OK entry if nothing specific
checks['root_filesystem'] = checks.get('/', {'status': 'OK', 'detail': 'Root filesystem healthy'})
return {'status': 'OK', 'checks': checks}
# Determine overall status
has_critical = any(d.get('status') == 'CRITICAL' for d in storage_details.values())
@@ -715,7 +820,8 @@ class HealthMonitor:
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': storage_details
'details': storage_details,
'checks': checks
}
def _check_filesystem(self, mount_point: str) -> Dict[str, Any]:
@@ -1025,19 +1131,42 @@ class HealthMonitor:
# Check connectivity (latency)
latency_status = self._check_network_latency()
if latency_status and latency_status.get('status') not in ['OK', 'INFO', 'UNKNOWN']:
issues.append(latency_status.get('reason', 'Network latency issue'))
if latency_status:
latency_ms = latency_status.get('latency_ms', 'N/A')
latency_sev = latency_status.get('status', 'OK')
interface_details['connectivity'] = latency_status
connectivity_check = {
'status': latency_sev if latency_sev not in ['UNKNOWN'] else 'OK',
'detail': f'Latency {latency_ms}ms to 1.1.1.1' if isinstance(latency_ms, (int, float)) else latency_status.get('reason', 'Unknown'),
}
if latency_sev not in ['OK', 'INFO', 'UNKNOWN']:
issues.append(latency_status.get('reason', 'Network latency issue'))
else:
connectivity_check = {'status': 'OK', 'detail': 'Not tested'}
# Build checks dict
checks = {}
for iface in active_interfaces:
checks[iface] = {'status': 'OK', 'detail': 'UP'}
for iface, detail in interface_details.items():
if iface != 'connectivity':
checks[iface] = {
'status': detail.get('status', 'OK'),
'detail': detail.get('reason', 'DOWN'),
'dismissable': detail.get('dismissable', False)
}
checks['connectivity'] = connectivity_check
if not issues:
return {'status': 'OK'}
return {'status': 'OK', 'checks': checks}
has_critical = any(d.get('status') == 'CRITICAL' for d in interface_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:2]),
'details': interface_details
'details': interface_details,
'checks': checks
}
except Exception:
@@ -1348,26 +1477,51 @@ class HealthMonitor:
'type': vm_type
}
# Build checks dict from vm_details
checks = {}
for key, val in vm_details.items():
vm_label = f"{val.get('type', 'VM')} {val.get('id', key)}"
checks[vm_label] = {
'status': val.get('status', 'WARNING'),
'detail': val.get('reason', 'Error'),
'dismissable': True
}
if not issues:
return {'status': 'OK'}
checks['all_vms_cts'] = {'status': 'OK', 'detail': 'No issues detected in logs'}
return {'status': 'OK', 'checks': checks}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details
'details': vm_details,
'checks': checks
}
except Exception:
return {'status': 'OK'}
return {'status': 'OK', 'checks': {}}
def _check_pve_services(self) -> Dict[str, Any]:
"""Check critical Proxmox services"""
"""
Check critical Proxmox services with persistence tracking.
- Checks the base PVE_SERVICES list
- Dynamically adds corosync if a cluster config exists
- Records failed services in persistence for tracking/dismiss
- Auto-clears when services recover
"""
try:
failed_services = []
# Build service list: base PVE services + corosync if clustered
services_to_check = list(self.PVE_SERVICES)
is_cluster = os.path.exists('/etc/corosync/corosync.conf')
if is_cluster and 'corosync' not in services_to_check:
services_to_check.append('corosync')
for service in self.PVE_SERVICES:
failed_services = []
service_details = {}
for service in services_to_check:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
@@ -1376,23 +1530,79 @@ class HealthMonitor:
timeout=2
)
if result.returncode != 0 or result.stdout.strip() != 'active':
status = result.stdout.strip()
if result.returncode != 0 or status != 'active':
failed_services.append(service)
service_details[service] = status or 'inactive'
except Exception:
# If systemctl fails (e.g., command not found or service doesn't exist), treat as failed
failed_services.append(service)
service_details[service] = 'error'
if failed_services:
return {
'status': 'CRITICAL',
'reason': f'Services inactive: {", ".join(failed_services)}',
'failed': failed_services
# Build checks dict with status per service
checks = {}
for svc in services_to_check:
if svc in failed_services:
state = service_details.get(svc, 'inactive')
checks[svc] = {
'status': 'CRITICAL',
'detail': f'Service is {state}',
}
else:
checks[svc] = {
'status': 'OK',
'detail': 'Active',
}
if is_cluster:
checks['cluster_mode'] = {
'status': 'OK',
'detail': 'Cluster detected (corosync.conf present)',
}
return {'status': 'OK'}
if failed_services:
reason = f'Services inactive: {", ".join(failed_services)}'
# Record each failed service in persistence
for svc in failed_services:
error_key = f'pve_service_{svc}'
health_persistence.record_error(
error_key=error_key,
category='services',
severity='CRITICAL',
reason=f'PVE service {svc} is {service_details.get(svc, "inactive")}',
details={'service': svc, 'state': service_details.get(svc, 'inactive')}
)
# Auto-clear services that recovered
for svc in services_to_check:
if svc not in failed_services:
error_key = f'pve_service_{svc}'
if health_persistence.is_error_active(error_key):
health_persistence.clear_error(error_key)
return {
'status': 'CRITICAL',
'reason': reason,
'failed': failed_services,
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
}
# All OK - clear any previously tracked service errors
for svc in services_to_check:
error_key = f'pve_service_{svc}'
if health_persistence.is_error_active(error_key):
health_persistence.clear_error(error_key)
return {
'status': 'OK',
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
}
except Exception as e:
# If the entire systemctl check fails
return {
'status': 'WARNING',
'reason': f'Service check command failed: {str(e)}'
@@ -1620,7 +1830,31 @@ class HealthMonitor:
status = 'OK'
reason = None
log_result = {'status': status}
# Build checks dict for log sub-items
log_checks = {
'error_cascade': {
'status': 'WARNING' if cascade_count > 0 else 'OK',
'detail': f'{cascade_count} pattern(s) repeating >=15 times' if cascade_count > 0 else 'No cascading errors',
'dismissable': True
},
'error_spike': {
'status': 'WARNING' if spike_count > 0 else 'OK',
'detail': f'{spike_count} pattern(s) with 4x increase' if spike_count > 0 else 'No error spikes',
'dismissable': True
},
'persistent_errors': {
'status': 'WARNING' if persistent_count > 0 else 'OK',
'detail': f'{persistent_count} recurring pattern(s) over 15+ min' if persistent_count > 0 else 'No persistent patterns',
'dismissable': True
},
'critical_errors': {
'status': 'CRITICAL' if unique_critical_count > 0 else 'OK',
'detail': f'{unique_critical_count} critical error(s) found' if unique_critical_count > 0 else 'No critical errors',
'dismissable': True
}
}
log_result = {'status': status, 'checks': log_checks}
if reason:
log_result['reason'] = reason
@@ -1629,7 +1863,12 @@ class HealthMonitor:
return log_result
# If journalctl command failed or returned no data
ok_result = {'status': 'OK'}
ok_result = {'status': 'OK', 'checks': {
'error_cascade': {'status': 'OK', 'detail': 'No cascading errors'},
'error_spike': {'status': 'OK', 'detail': 'No error spikes'},
'persistent_errors': {'status': 'OK', 'detail': 'No persistent patterns'},
'critical_errors': {'status': 'OK', 'detail': 'No critical errors'}
}}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
@@ -1662,9 +1901,9 @@ class HealthMonitor:
def _check_updates(self) -> Optional[Dict[str, Any]]:
"""
Check for pending system updates.
- WARNING: If security updates are available.
- CRITICAL: If system not updated in >2 years.
- INFO: If 1-2 years without updates, or many non-security updates.
- WARNING: Security updates available, or system not updated >1 year (365 days).
- CRITICAL: System not updated >18 months (548 days).
- INFO: Kernel/PVE updates available, or >50 non-security updates pending.
"""
cache_key = 'updates_check'
current_time = time.time()
@@ -1730,12 +1969,12 @@ class HealthMonitor:
reason=reason,
details={'count': len(security_updates_packages), 'packages': security_updates_packages[:5]}
)
elif last_update_days and last_update_days >= 730:
# 2+ years without updates - CRITICAL
elif last_update_days and last_update_days >= 548:
# 18+ months without updates - CRITICAL
status = 'CRITICAL'
reason = f'System not updated in {last_update_days} days (>2 years)'
reason = f'System not updated in {last_update_days} days (>18 months)'
health_persistence.record_error(
error_key='updates_730days',
error_key='updates_548days',
category='updates',
severity='CRITICAL',
reason=reason,
@@ -1766,14 +2005,40 @@ class HealthMonitor:
status = 'WARNING'
reason = 'Failed to check for updates (apt-get error)'
# Build checks dict for updates sub-items
update_age_status = 'CRITICAL' if (last_update_days and last_update_days >= 548) else ('WARNING' if (last_update_days and last_update_days >= 365) else 'OK')
sec_status = 'WARNING' if security_updates_packages else 'OK'
kernel_status = 'INFO' if kernel_pve_updates_packages else 'OK'
checks = {
'security_updates': {
'status': sec_status,
'detail': f'{len(security_updates_packages)} security update(s) pending' if security_updates_packages else 'No security updates pending',
},
'system_age': {
'status': update_age_status,
'detail': f'Last updated {last_update_days} day(s) ago' if last_update_days is not None else 'Unknown',
'thresholds': 'Warning >365 days, Critical >548 days'
},
'pending_updates': {
'status': 'INFO' if update_count > 50 else 'OK',
'detail': f'{update_count} package(s) pending',
},
'kernel_pve': {
'status': kernel_status,
'detail': f'{len(kernel_pve_updates_packages)} kernel/PVE update(s)' if kernel_pve_updates_packages else 'Kernel/PVE up to date',
}
}
# Construct result dictionary
update_result = {
'status': status,
'count': update_count
'count': update_count,
'checks': checks
}
if reason:
update_result['reason'] = reason
if last_update_days is not None: # Only add if we could determine days_since_update
if last_update_days is not None:
update_result['days_since_update'] = last_update_days
self.cached_results[cache_key] = update_result
@@ -1782,39 +2047,188 @@ class HealthMonitor:
except Exception as e:
print(f"[HealthMonitor] Error checking updates: {e}")
# Return OK on exception to avoid false alerts
return {'status': 'OK', 'count': 0}
return {'status': 'OK', 'count': 0, 'checks': {}}
def _check_fail2ban_bans(self) -> Dict[str, Any]:
"""
Check if fail2ban is installed and if there are currently banned IPs.
Cached for 60 seconds to avoid hammering fail2ban-client.
Returns:
{'installed': bool, 'active': bool, 'status': str, 'detail': str,
'banned_count': int, 'jails': [...], 'banned_ips': [...]}
"""
cache_key = 'fail2ban_bans'
current_time = time.time()
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {'installed': False, 'status': 'OK', 'detail': 'Not installed'})
result = {'installed': False, 'active': False, 'status': 'OK', 'detail': 'Not installed', 'banned_count': 0, 'jails': [], 'banned_ips': []}
try:
# Check if fail2ban-client exists
which_result = subprocess.run(
['which', 'fail2ban-client'],
capture_output=True, text=True, timeout=2
)
if which_result.returncode != 0:
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['installed'] = True
# Check if fail2ban service is active
active_check = subprocess.run(
['systemctl', 'is-active', 'fail2ban'],
capture_output=True, text=True, timeout=2
)
if active_check.stdout.strip() != 'active':
result['detail'] = 'Fail2Ban installed but service not active'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['active'] = True
# Get list of active jails
jails_result = subprocess.run(
['fail2ban-client', 'status'],
capture_output=True, text=True, timeout=3
)
jails = []
if jails_result.returncode == 0:
for line in jails_result.stdout.split('\n'):
if 'Jail list:' in line:
jail_str = line.split('Jail list:')[1].strip()
jails = [j.strip() for j in jail_str.split(',') if j.strip()]
break
if not jails:
result['detail'] = 'Fail2Ban active, no jails configured'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['jails'] = jails
# Check each jail for banned IPs
total_banned = 0
all_banned_ips = []
jails_with_bans = []
for jail in jails:
try:
jail_result = subprocess.run(
['fail2ban-client', 'status', jail],
capture_output=True, text=True, timeout=2
)
if jail_result.returncode == 0:
for line in jail_result.stdout.split('\n'):
if 'Currently banned:' in line:
try:
count = int(line.split('Currently banned:')[1].strip())
if count > 0:
total_banned += count
jails_with_bans.append(jail)
except (ValueError, IndexError):
pass
elif 'Banned IP list:' in line:
ips_str = line.split('Banned IP list:')[1].strip()
if ips_str:
ips = [ip.strip() for ip in ips_str.split() if ip.strip()]
all_banned_ips.extend(ips[:10]) # Limit to 10 IPs per jail
except Exception:
pass
result['banned_count'] = total_banned
result['banned_ips'] = all_banned_ips[:20] # Max 20 total
if total_banned > 0:
jails_str = ', '.join(jails_with_bans)
msg = f'{total_banned} IP(s) currently banned by Fail2Ban (jails: {jails_str})'
result['status'] = 'WARNING'
result['detail'] = msg
# Record in persistence (dismissable)
health_persistence.record_error(
error_key='security_fail2ban_ban',
category='security',
severity='WARNING',
reason=msg,
details={
'banned_count': total_banned,
'jails': jails_with_bans,
'banned_ips': all_banned_ips[:5],
'dismissable': True
}
)
else:
result['detail'] = f'Fail2Ban active ({len(jails)} jail(s), no current bans)'
# Auto-resolve if previously banned IPs are now gone
if health_persistence.is_error_active('security_fail2ban_ban'):
health_persistence.clear_error('security_fail2ban_ban')
except Exception as e:
result['detail'] = f'Unable to check Fail2Ban: {str(e)[:50]}'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
def _check_security(self) -> Dict[str, Any]:
"""
Check security-related items:
- Uptime > 1 year (indicates potential kernel vulnerability if not updated)
- SSL certificate expiration (non-INFO certs)
- Excessive failed login attempts
Check security-related items with detailed sub-item breakdown:
- Uptime check: >1 year without kernel update indicates vulnerability
- SSL certificates: PVE certificate expiration
- Login attempts: Excessive failed logins (brute force detection)
- Fail2Ban: Currently banned IPs (if fail2ban is installed)
Returns a result with 'checks' dict containing per-item status.
"""
try:
issues = []
checks = {
'uptime': {'status': 'OK', 'detail': ''},
'certificates': {'status': 'OK', 'detail': ''},
'login_attempts': {'status': 'OK', 'detail': ''},
'fail2ban': {'status': 'OK', 'detail': 'Not installed'}
}
# Check uptime for potential kernel vulnerabilities (if not updated)
# Sub-check 1: Uptime for potential kernel vulnerabilities
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = uptime_seconds / 86400
# If uptime is over a year and no recent updates, it's a warning
if uptime_days > 365:
# Check if updates check shows recent activity
updates_data = self.cached_results.get('updates_check')
if updates_data and updates_data.get('days_since_update', 9999) > 365:
issues.append(f'Uptime {int(uptime_days)} days (>1 year, consider updating kernel/system)')
msg = f'Uptime {int(uptime_days)} days (>1 year, consider updating kernel/system)'
issues.append(msg)
checks['uptime'] = {'status': 'WARNING', 'detail': msg, 'days': int(uptime_days)}
else:
checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days, system recently updated'}
else:
checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days'}
except Exception:
pass # Ignore if uptime calculation fails
checks['uptime'] = {'status': 'OK', 'detail': 'Unable to determine uptime'}
# Check SSL certificates (only report non-OK statuses)
# Sub-check 2: SSL certificates
cert_status = self._check_certificates()
if cert_status and cert_status.get('status') not in ['OK', 'INFO']:
issues.append(cert_status.get('reason', 'Certificate issue'))
if cert_status:
cert_sev = cert_status.get('status', 'OK')
cert_reason = cert_status.get('reason', '')
checks['certificates'] = {
'status': cert_sev,
'detail': cert_reason if cert_reason else 'Certificate valid'
}
if cert_sev not in ['OK', 'INFO']:
issues.append(cert_reason or 'Certificate issue')
# Check for excessive failed login attempts in the last 24 hours
# Sub-check 3: Failed login attempts (brute force detection)
try:
result = subprocess.run(
['journalctl', '--since', '24 hours ago', '--no-pager'],
@@ -1823,29 +2237,57 @@ class HealthMonitor:
timeout=3
)
failed_logins = 0
if result.returncode == 0:
failed_logins = 0
for line in result.stdout.split('\n'):
# Common patterns for failed logins in journald
if 'authentication failure' in line.lower() or 'failed password' in line.lower() or 'invalid user' in line.lower():
line_lower = line.lower()
if 'authentication failure' in line_lower or 'failed password' in line_lower or 'invalid user' in line_lower:
failed_logins += 1
if failed_logins > 50: # Threshold for significant failed attempts
issues.append(f'{failed_logins} failed login attempts in 24h')
if failed_logins > 50:
msg = f'{failed_logins} failed login attempts in 24h'
issues.append(msg)
checks['login_attempts'] = {'status': 'WARNING', 'detail': msg, 'count': failed_logins}
elif failed_logins > 0:
checks['login_attempts'] = {'status': 'OK', 'detail': f'{failed_logins} failed attempts in 24h (within threshold)', 'count': failed_logins}
else:
checks['login_attempts'] = {'status': 'OK', 'detail': 'No failed login attempts in 24h', 'count': 0}
except Exception:
pass # Ignore if journalctl fails
checks['login_attempts'] = {'status': 'OK', 'detail': 'Unable to check login attempts'}
# Sub-check 4: Fail2Ban ban detection
try:
f2b = self._check_fail2ban_bans()
checks['fail2ban'] = {
'status': f2b.get('status', 'OK'),
'detail': f2b.get('detail', ''),
'installed': f2b.get('installed', False),
'banned_count': f2b.get('banned_count', 0)
}
if f2b.get('status') == 'WARNING':
issues.append(f2b.get('detail', 'Fail2Ban bans detected'))
except Exception:
checks['fail2ban'] = {'status': 'OK', 'detail': 'Unable to check Fail2Ban'}
# Determine overall security status
if issues:
# Check if any sub-check is CRITICAL
has_critical = any(c.get('status') == 'CRITICAL' for c in checks.values())
overall_status = 'CRITICAL' if has_critical else 'WARNING'
return {
'status': 'WARNING', # Security issues are typically warnings
'reason': '; '.join(issues[:2]) # Show up to 2 issues
'status': overall_status,
'reason': '; '.join(issues[:2]),
'checks': checks
}
return {'status': 'OK'}
return {
'status': 'OK',
'checks': checks
}
except Exception as e:
print(f"[HealthMonitor] Error checking security: {e}")
return {'status': 'OK'}
return {'status': 'OK', 'checks': {}}
def _check_certificates(self) -> Optional[Dict[str, Any]]:
"""
@@ -2138,141 +2580,7 @@ class HealthMonitor:
'timestamp': datetime.now().isoformat()
}
# This is a duplicate of the get_detailed_status method at the top of the file.
# It's likely an oversight from copy-pasting. One of them should be removed or renamed.
# Keeping both for now to match the provided structure, but in a refactor, this would be cleaned up.
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure with ALL 10 categories always present.
Now includes persistent error tracking.
"""
active_errors = health_persistence.get_active_errors()
# No need to create persistent_issues dict here, it's implicitly handled by the checks
details = {
'cpu': {'status': 'OK'},
'memory': {'status': 'OK'},
'storage': {'status': 'OK'}, # This will be overwritten by specific storage checks
'disks': {'status': 'OK'}, # This will be overwritten by disk/filesystem checks
'network': {'status': 'OK'},
'vms': {'status': 'OK'},
'services': {'status': 'OK'},
'logs': {'status': 'OK'},
'updates': {'status': 'OK'},
'security': {'status': 'OK'}
}
critical_issues = []
warning_issues = []
info_issues = [] # Added info_issues to track INFO separately
# --- Priority Order of Checks ---
# Priority 1: Critical PVE Services
services_status = self._check_pve_services()
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(f"PVE Services: {services_status.get('reason', 'Service failure')}")
elif services_status['status'] == 'WARNING':
warning_issues.append(f"PVE Services: {services_status.get('reason', 'Service issue')}")
# Priority 1.5: Proxmox Storage Check (External Module)
proxmox_storage_result = self._check_proxmox_storage()
if proxmox_storage_result: # Only process if the check ran (module available)
details['storage'] = proxmox_storage_result
if proxmox_storage_result.get('status') == 'CRITICAL':
critical_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage unavailable'))
elif proxmox_storage_result.get('status') == 'WARNING':
warning_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage issue'))
# Priority 2: Disk/Filesystem Health (Internal checks: usage, ZFS, SMART, IO errors)
storage_status = self._check_storage_optimized()
details['disks'] = storage_status # Use 'disks' for filesystem/disk specific issues
if storage_status.get('status') == 'CRITICAL':
critical_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage failure')}")
elif storage_status.get('status') == 'WARNING':
warning_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage issue')}")
# Priority 3: VMs/CTs Status (with persistence)
vms_status = self._check_vms_cts_with_persistence()
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT failure')}")
elif vms_status.get('status') == 'WARNING':
warning_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT issue')}")
# Priority 4: Network Connectivity
network_status = self._check_network_optimized()
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(f"Network: {network_status.get('reason', 'Network failure')}")
elif network_status.get('status') == 'WARNING':
warning_issues.append(f"Network: {network_status.get('reason', 'Network issue')}")
# Priority 5: CPU Usage (with hysteresis)
cpu_status = self._check_cpu_with_hysteresis()
details['cpu'] = cpu_status
if cpu_status.get('status') == 'CRITICAL':
critical_issues.append(f"CPU: {cpu_status.get('reason', 'CPU critical')}")
elif cpu_status.get('status') == 'WARNING':
warning_issues.append(f"CPU: {cpu_status.get('reason', 'CPU high')}")
# Priority 6: Memory Usage (RAM and Swap)
memory_status = self._check_memory_comprehensive()
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(f"Memory: {memory_status.get('reason', 'Memory critical')}")
elif memory_status.get('status') == 'WARNING':
warning_issues.append(f"Memory: {memory_status.get('reason', 'Memory high')}")
# Priority 7: Log Analysis (with persistence)
logs_status = self._check_logs_with_persistence()
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(f"Logs: {logs_status.get('reason', 'Critical log errors')}")
elif logs_status.get('status') == 'WARNING':
warning_issues.append(f"Logs: {logs_status.get('reason', 'Log warnings')}")
# Priority 8: System Updates
updates_status = self._check_updates()
details['updates'] = updates_status
if updates_status.get('status') == 'CRITICAL':
critical_issues.append(f"Updates: {updates_status.get('reason', 'System not updated')}")
elif updates_status.get('status') == 'WARNING':
warning_issues.append(f"Updates: {updates_status.get('reason', 'Updates pending')}")
elif updates_status.get('status') == 'INFO':
info_issues.append(f"Updates: {updates_status.get('reason', 'Informational update notice')}")
# Priority 9: Security Checks
security_status = self._check_security()
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(f"Security: {security_status.get('reason', 'Security issue')}")
elif security_status.get('status') == 'INFO':
info_issues.append(f"Security: {security_status.get('reason', 'Security information')}")
# --- Determine Overall Status ---
# Use a fixed order of severity: CRITICAL > WARNING > INFO > OK
if critical_issues:
overall = 'CRITICAL'
summary = '; '.join(critical_issues[:3]) # Limit summary to 3 issues
elif warning_issues:
overall = 'WARNING'
summary = '; '.join(warning_issues[:3])
elif info_issues:
overall = 'OK' # INFO statuses don't degrade overall health
summary = '; '.join(info_issues[:3])
else:
overall = 'OK'
summary = 'All systems operational'
return {
'overall': overall,
'summary': summary,
'details': details,
'timestamp': datetime.now().isoformat()
}
# Duplicate get_detailed_status was removed during refactor (v1.1)
# Global instance