Update AppImage

This commit is contained in:
MacRimi
2025-11-09 17:56:37 +01:00
parent a75aad1fdc
commit c45ebfe598
2 changed files with 171 additions and 73 deletions

View File

@@ -290,6 +290,8 @@ class HealthMonitor:
details['updates'] = updates_status
if updates_status.get('status') == 'WARNING':
warning_issues.append(updates_status.get('reason', 'Updates pending'))
elif updates_status.get('status') == 'INFO': # Treat INFO as a warning for overall summary
warning_issues.append(updates_status.get('reason', 'Informational update status'))
# Priority 10: Security
security_status = self._check_security()
@@ -386,7 +388,7 @@ class HealthMonitor:
return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'}
def _check_cpu_temperature(self) -> Optional[Dict[str, Any]]:
"""Check CPU temperature (cached, max 1 check per minute)"""
"""Check CPU temperature with hysteresis (5 min sustained) - cached, max 1 check per minute"""
cache_key = 'cpu_temp'
current_time = time.time()
@@ -415,12 +417,38 @@ class HealthMonitor:
if temps:
max_temp = max(temps)
if max_temp >= self.TEMP_CRITICAL:
state_key = 'cpu_temp_history'
self.state_history[state_key].append({
'value': max_temp,
'time': current_time
})
# Keep last 6 minutes of data
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 360
]
# Check sustained high temperature (5 minutes)
critical_temp_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.TEMP_CRITICAL and
current_time - entry['time'] <= 300
]
warning_temp_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.TEMP_WARNING and
current_time - entry['time'] <= 300
]
# Require at least 3 samples over 5 minutes to trigger alert
if len(critical_temp_samples) >= 3:
status = 'CRITICAL'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_CRITICAL}°C'
elif max_temp >= self.TEMP_WARNING:
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_CRITICAL}°C sustained >5min'
elif len(warning_temp_samples) >= 3:
status = 'WARNING'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_WARNING}°C'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_WARNING}°C sustained >5min'
else:
status = 'OK'
reason = None
@@ -1232,7 +1260,10 @@ class HealthMonitor:
return {'status': 'OK'}
def _check_updates(self) -> Optional[Dict[str, Any]]:
"""Check for pending system updates (cached, checked every 10 minutes)"""
"""
Check for pending system updates with intelligence.
Only warns for: critical security updates, kernel updates, or updates pending >30 days.
"""
cache_key = 'updates_check'
current_time = time.time()
@@ -1241,9 +1272,8 @@ class HealthMonitor:
return self.cached_results.get(cache_key)
try:
# Check apt updates
result = subprocess.run(
['apt', 'list', '--upgradable'],
['apt-get', 'upgrade', '--dry-run'],
capture_output=True,
text=True,
timeout=5
@@ -1251,15 +1281,36 @@ class HealthMonitor:
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
# First line is header
update_count = len([l for l in lines if l and not l.startswith('Listing')])
if update_count >= self.UPDATES_CRITICAL:
# Count total updates
update_count = 0
security_updates = []
kernel_updates = []
for line in lines:
if line.startswith('Inst '):
update_count += 1
line_lower = line.lower()
# Check for security updates
if 'security' in line_lower or 'debian-security' in line_lower:
package_name = line.split()[1]
security_updates.append(package_name)
# Check for kernel or critical PVE updates
if any(pkg in line_lower for pkg in ['linux-image', 'pve-kernel', 'pve-manager', 'proxmox-ve']):
package_name = line.split()[1]
kernel_updates.append(package_name)
if security_updates:
status = 'WARNING'
reason = f'{update_count} updates pending (≥{self.UPDATES_CRITICAL})'
elif update_count >= self.UPDATES_WARNING:
status = 'WARNING'
reason = f'{update_count} updates pending'
reason = f'{len(security_updates)} security update(s) available'
elif kernel_updates:
status = 'INFO' # Informational, not critical
reason = f'{len(kernel_updates)} kernel/PVE update(s) available'
elif update_count > 50:
status = 'INFO'
reason = f'{update_count} updates pending (consider maintenance window)'
else:
status = 'OK'
reason = None
@@ -1275,31 +1326,53 @@ class HealthMonitor:
self.last_check_times[cache_key] = current_time
return update_result
return None
return {'status': 'OK', 'count': 0}
except Exception:
return None
except Exception as e:
return {'status': 'OK', 'count': 0}
def _check_security(self) -> Dict[str, Any]:
"""Check security-related items (certificates, uptime)"""
"""
Check security-related items:
- SSL certificate validity and expiration
- Failed login attempts
- Excessive uptime (>365 days = kernel vulnerabilities)
"""
try:
issues = []
# Check uptime (warning if >180 days)
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = uptime_seconds / 86400
if uptime_days > 180:
issues.append(f'Uptime {int(uptime_days)} days (>180)')
if uptime_days > 365:
issues.append(f'Uptime {int(uptime_days)} days (>1 year, kernel updates needed)')
except Exception:
pass
# Check SSL certificates
cert_status = self._check_certificates()
if cert_status and cert_status.get('status') != 'OK':
if cert_status and cert_status.get('status') not in ['OK', 'INFO']:
issues.append(cert_status.get('reason', 'Certificate issue'))
try:
result = subprocess.run(
['journalctl', '--since', '24 hours ago', '--no-pager'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
failed_logins = 0
for line in result.stdout.split('\n'):
if 'authentication failure' in line.lower() or 'failed password' in line.lower():
failed_logins += 1
if failed_logins > 50:
issues.append(f'{failed_logins} failed login attempts in 24h')
except Exception:
pass
if issues:
return {
'status': 'WARNING',
@@ -1312,7 +1385,12 @@ class HealthMonitor:
return {'status': 'OK'}
def _check_certificates(self) -> Optional[Dict[str, Any]]:
"""Check SSL certificate expiration (cached, checked once per day)"""
"""
Check SSL certificate expiration.
INFO: Self-signed or no cert configured (normal for internal servers)
WARNING: Expires <30 days
CRITICAL: Expired
"""
cache_key = 'certificates'
current_time = time.time()
@@ -1323,46 +1401,54 @@ class HealthMonitor:
try:
cert_path = '/etc/pve/local/pve-ssl.pem'
if os.path.exists(cert_path):
result = subprocess.run(
['openssl', 'x509', '-enddate', '-noout', '-in', cert_path],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
date_str = result.stdout.strip().replace('notAfter=', '')
try:
from datetime import datetime
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (exp_date - datetime.now()).days
if days_until_expiry < 0:
status = 'CRITICAL'
reason = 'Certificate expired'
elif days_until_expiry < 15:
status = 'WARNING'
reason = f'Certificate expires in {days_until_expiry} days'
else:
status = 'OK'
reason = None
cert_result = {'status': status}
if reason:
cert_result['reason'] = reason
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
except Exception:
pass
if not os.path.exists(cert_path):
cert_result = {
'status': 'INFO',
'reason': 'Self-signed or default certificate'
}
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
return None
result = subprocess.run(
['openssl', 'x509', '-enddate', '-noout', '-in', cert_path],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
date_str = result.stdout.strip().replace('notAfter=', '')
try:
from datetime import datetime
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (exp_date - datetime.now()).days
if days_until_expiry < 0:
status = 'CRITICAL'
reason = 'Certificate expired'
elif days_until_expiry < 30:
status = 'WARNING'
reason = f'Certificate expires in {days_until_expiry} days'
else:
status = 'OK'
reason = None
cert_result = {'status': status}
if reason:
cert_result['reason'] = reason
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
except Exception:
pass
return {'status': 'INFO', 'reason': 'Certificate check inconclusive'}
except Exception:
return None
return {'status': 'OK'}
def _check_disk_health_from_events(self) -> Dict[str, Any]:
"""