Update AppImage

This commit is contained in:
MacRimi
2025-11-09 16:30:29 +01:00
parent 1712d32ef7
commit b9619efbbf
4 changed files with 330 additions and 250 deletions

View File

@@ -4,7 +4,7 @@ Provides comprehensive, lightweight health checks for Proxmox systems.
Optimized for minimal system impact with intelligent thresholds and hysteresis.
Author: MacRimi
Version: 1.1 (Optimized for minimal overhead)
Version: 1.2 (Always returns all 10 categories)
"""
import psutil
@@ -15,12 +15,13 @@ import os
from typing import Dict, List, Any, Tuple, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import re
class HealthMonitor:
"""
Monitors system health across multiple components with minimal impact.
Implements hysteresis, intelligent caching, and progressive escalation.
Only reports problems, not verbose OK statuses.
Always returns all 10 health categories.
"""
# CPU Thresholds
@@ -186,92 +187,104 @@ class HealthMonitor:
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure matching the specification.
OPTIMIZED: Only shows problems, not verbose OK messages.
Returns JSON structure with ALL 10 categories always present.
"""
details = {}
details = {
'cpu': {'status': 'OK'},
'memory': {'status': 'OK'},
'storage': {'status': 'OK'},
'disks': {'status': 'OK'},
'network': {'status': 'OK'},
'vms': {'status': 'OK'},
'services': {'status': 'OK'},
'logs': {'status': 'OK'},
'updates': {'status': 'OK'},
'security': {'status': 'OK'}
}
critical_issues = []
warning_issues = []
# Priority 1: Services PVE
services_status = self._check_pve_services()
if services_status['status'] != 'OK':
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(services_status.get('reason', 'Service failure'))
elif services_status['status'] == 'WARNING':
warning_issues.append(services_status.get('reason', 'Service issue'))
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(services_status.get('reason', 'Service failure'))
elif services_status['status'] == 'WARNING':
warning_issues.append(services_status.get('reason', 'Service issue'))
# Priority 2: Storage
storage_status = self._check_storage_optimized()
if storage_status and storage_status.get('status') != 'OK':
if storage_status:
details['storage'] = storage_status
if storage_status.get('status') == 'CRITICAL':
critical_issues.append(storage_status.get('reason', 'Storage failure'))
elif storage_status.get('status') == 'WARNING':
warning_issues.append(storage_status.get('reason', 'Storage issue'))
# Priority 3: Disks
disks_status = self._check_disks_optimized()
if disks_status and disks_status.get('status') != 'OK':
if disks_status:
details['disks'] = disks_status
if disks_status.get('status') == 'CRITICAL':
critical_issues.append(disks_status.get('reason', 'Disk failure'))
elif disks_status.get('status') == 'WARNING':
warning_issues.append(disks_status.get('reason', 'Disk issue'))
# Priority 4: VMs/CTs - now detects qmp errors from logs
vms_status = self._check_vms_cts_optimized()
if vms_status and vms_status.get('status') != 'OK':
if vms_status:
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(vms_status.get('reason', 'VM/CT failure'))
elif vms_status.get('status') == 'WARNING':
warning_issues.append(vms_status.get('reason', 'VM/CT issue'))
# Priority 5: Network
network_status = self._check_network_optimized()
if network_status and network_status.get('status') != 'OK':
if network_status:
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(network_status.get('reason', 'Network failure'))
elif network_status.get('status') == 'WARNING':
warning_issues.append(network_status.get('reason', 'Network issue'))
# Priority 5: CPU/RAM (solo si hay problemas)
# Priority 6: CPU
cpu_status = self._check_cpu_with_hysteresis()
if cpu_status.get('status') != 'OK':
details['cpu'] = cpu_status
if cpu_status.get('status') == 'WARNING':
warning_issues.append(cpu_status.get('reason', 'CPU high'))
elif cpu_status.get('status') == 'CRITICAL':
critical_issues.append(cpu_status.get('reason', 'CPU critical'))
details['cpu'] = cpu_status
if cpu_status.get('status') == 'WARNING':
warning_issues.append(cpu_status.get('reason', 'CPU high'))
elif cpu_status.get('status') == 'CRITICAL':
critical_issues.append(cpu_status.get('reason', 'CPU critical'))
# Priority 7: Memory
memory_status = self._check_memory_comprehensive()
if memory_status.get('status') != 'OK':
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(memory_status.get('reason', 'Memory critical'))
elif memory_status.get('status') == 'WARNING':
warning_issues.append(memory_status.get('reason', 'Memory high'))
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(memory_status.get('reason', 'Memory critical'))
elif memory_status.get('status') == 'WARNING':
warning_issues.append(memory_status.get('reason', 'Memory high'))
# Priority 6: Logs (solo errores críticos)
# Priority 8: Logs
logs_status = self._check_logs_lightweight()
if logs_status.get('status') != 'OK':
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(logs_status.get('reason', 'Critical log errors'))
elif logs_status.get('status') == 'WARNING':
warning_issues.append(logs_status.get('reason', 'Log warnings'))
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(logs_status.get('reason', 'Critical log errors'))
elif logs_status.get('status') == 'WARNING':
warning_issues.append(logs_status.get('reason', 'Log warnings'))
# Priority 9: Updates
updates_status = self._check_updates()
if updates_status and updates_status.get('status') != 'OK':
if updates_status:
details['updates'] = updates_status
if updates_status.get('status') == 'WARNING':
warning_issues.append(updates_status.get('reason', 'Updates pending'))
# Priority 7: Security (solo problemas)
# Priority 10: Security
security_status = self._check_security()
if security_status.get('status') != 'OK':
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(security_status.get('reason', 'Security issue'))
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(security_status.get('reason', 'Security issue'))
# Determine overall status
if critical_issues:
@@ -498,9 +511,9 @@ class HealthMonitor:
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'}
def _check_storage_optimized(self) -> Optional[Dict[str, Any]]:
def _check_storage_optimized(self) -> Dict[str, Any]:
"""
Optimized storage check - only reports problems.
Optimized storage check - always returns status.
Checks critical mounts, LVM, and Proxmox storages.
"""
issues = []
@@ -510,12 +523,34 @@ class HealthMonitor:
critical_mounts = ['/', '/var/lib/vz']
for mount_point in critical_mounts:
if not os.path.exists(mount_point):
issues.append(f'{mount_point} not mounted')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Not mounted'
}
is_mounted = False
try:
result = subprocess.run(
['mountpoint', '-q', mount_point],
capture_output=True,
timeout=2
)
is_mounted = (result.returncode == 0)
except:
pass
if not is_mounted:
# Only report as error if it's supposed to exist
if mount_point == '/':
issues.append(f'{mount_point}: Not mounted')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Not mounted'
}
# For /var/lib/vz, it might not be a separate mount, check if dir exists
elif mount_point == '/var/lib/vz':
if os.path.exists(mount_point):
# It exists as directory, check usage
fs_status = self._check_filesystem(mount_point)
if fs_status['status'] != 'OK':
issues.append(f"{mount_point}: {fs_status['reason']}")
storage_details[mount_point] = fs_status
# If doesn't exist, skip silently (might use different storage)
continue
fs_status = self._check_filesystem(mount_point)
@@ -536,7 +571,6 @@ class HealthMonitor:
issues.append(f"{storage_name}: {storage_data.get('reason', 'Storage issue')}")
storage_details[storage_name] = storage_data
# If no issues, return None (optimized)
if not issues:
return {'status': 'OK'}
@@ -605,8 +639,8 @@ class HealthMonitor:
'reason': f'Check failed: {str(e)}'
}
def _check_lvm(self) -> Optional[Dict[str, Any]]:
"""Check LVM volumes, especially local-lvm"""
def _check_lvm(self) -> Dict[str, Any]:
"""Check LVM volumes - improved detection"""
try:
result = subprocess.run(
['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'],
@@ -616,10 +650,9 @@ class HealthMonitor:
)
if result.returncode != 0:
return None
return {'status': 'OK'}
volumes = []
local_lvm_found = False
for line in result.stdout.strip().split('\n'):
if line.strip():
@@ -628,20 +661,11 @@ class HealthMonitor:
lv_name = parts[0].strip()
vg_name = parts[1].strip()
volumes.append(f'{vg_name}/{lv_name}')
if 'local-lvm' in lv_name or 'local-lvm' in vg_name:
local_lvm_found = True
if volumes and not local_lvm_found:
return {
'status': 'CRITICAL',
'reason': 'local-lvm volume not found'
}
return {'status': 'OK'}
return {'status': 'OK', 'volumes': len(volumes)}
except Exception:
return None
return {'status': 'OK'}
def _check_proxmox_storages(self) -> Dict[str, Any]:
"""Check Proxmox-specific storages (only report problems)"""
@@ -680,9 +704,9 @@ class HealthMonitor:
return storages
def _check_disks_optimized(self) -> Optional[Dict[str, Any]]:
def _check_disks_optimized(self) -> Dict[str, Any]:
"""
Optimized disk check - only reports I/O errors and SMART issues.
Optimized disk check - always returns status.
"""
current_time = time.time()
disk_issues = {}
@@ -725,7 +749,6 @@ class HealthMonitor:
'reason': f'{error_count} I/O error(s) in 5 minutes'
}
# If no issues, return OK
if not disk_issues:
return {'status': 'OK'}
@@ -738,12 +761,11 @@ class HealthMonitor:
}
except Exception:
return None
return {'status': 'OK'}
def _check_network_optimized(self) -> Optional[Dict[str, Any]]:
def _check_network_optimized(self) -> Dict[str, Any]:
"""
Optimized network check - only reports problems.
Checks interfaces down, no connectivity.
Optimized network check - always returns status.
"""
try:
issues = []
@@ -770,7 +792,6 @@ class HealthMonitor:
issues.append(latency_status.get('reason', 'Network latency issue'))
interface_details['connectivity'] = latency_status
# If no issues, return OK
if not issues:
return {'status': 'OK'}
@@ -783,7 +804,7 @@ class HealthMonitor:
}
except Exception:
return None
return {'status': 'OK'}
def _check_network_latency(self) -> Optional[Dict[str, Any]]:
"""Check network latency to 1.1.1.1 (cached)"""
@@ -843,18 +864,18 @@ class HealthMonitor:
except Exception:
return None
def _check_vms_cts_optimized(self) -> Optional[Dict[str, Any]]:
def _check_vms_cts_optimized(self) -> Dict[str, Any]:
"""
Optimized VM/CT check - only reports failed starts.
Checks logs for VMs/CTs that failed to start.
Optimized VM/CT check - detects qmp failures and other VM errors.
Now parses logs for VM/CT specific errors like qmp command failures.
"""
try:
issues = []
vm_details = {}
# Check logs for failed VM/CT starts
# Check logs for VM/CT errors
result = subprocess.run(
['journalctl', '--since', '10 minutes ago', '--no-pager', '-u', 'pve*'],
['journalctl', '--since', '10 minutes ago', '--no-pager', '-u', 'pve*', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
@@ -864,34 +885,66 @@ class HealthMonitor:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# Detect VM/CT start failures
# Pattern 1: "VM 106 qmp command failed"
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command', line_lower)
if vm_qmp_match:
vmid = vm_qmp_match.group(1)
key = f'vm_{vmid}'
if key not in vm_details:
issues.append(f'VM {vmid}: QMP command error')
vm_details[key] = {
'status': 'WARNING',
'reason': 'QMP command failed',
'id': vmid,
'type': 'VM'
}
continue
# Pattern 2: "CT 103 error" or "Container 103"
ct_match = re.search(r'(?:ct|container)\s+(\d+)', line_lower)
if ct_match and ('error' in line_lower or 'fail' in line_lower):
ctid = ct_match.group(1)
key = f'ct_{ctid}'
if key not in vm_details:
issues.append(f'CT {ctid}: Error detected')
vm_details[key] = {
'status': 'WARNING',
'reason': 'Container error',
'id': ctid,
'type': 'CT'
}
continue
# Pattern 3: Generic VM/CT start failures
if 'failed to start' in line_lower or 'error starting' in line_lower or \
'start error' in line_lower or 'cannot start' in line_lower:
# Extract VM/CT ID
for word in line.split():
if word.isdigit() and len(word) <= 4:
vmid = word
if vmid not in self.failed_vm_history:
self.failed_vm_history.add(vmid)
issues.append(f'VM/CT {vmid} failed to start')
vm_details[f'vmct_{vmid}'] = {
'status': 'CRITICAL',
'reason': 'Failed to start'
}
break
id_match = re.search(r'\b(\d{3,4})\b', line)
if id_match:
vmid = id_match.group(1)
key = f'vmct_{vmid}'
if key not in vm_details:
issues.append(f'VM/CT {vmid}: Failed to start')
vm_details[key] = {
'status': 'CRITICAL',
'reason': 'Failed to start',
'id': vmid,
'type': 'VM/CT'
}
# If no issues, return OK
if not issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL',
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details
}
except Exception:
return None
return {'status': 'OK'}
def _check_pve_services(self) -> Dict[str, Any]:
"""Check critical Proxmox services"""