Update notification service

This commit is contained in:
MacRimi
2026-02-21 17:23:03 +01:00
parent f134fcb528
commit de13eb5b96
3 changed files with 333 additions and 99 deletions

View File

@@ -18,6 +18,7 @@ import json
import time
import hashlib
import socket
import sqlite3
import subprocess
import threading
from queue import Queue
@@ -614,27 +615,68 @@ class TaskWatcher:
# ─── Polling Collector ────────────────────────────────────────────
class PollingCollector:
"""Periodic collector that reads Health Monitor pending notifications.
"""Periodic collector that polls health state independently.
Polls health_persistence for:
- Pending notification events (state changes from Bloque A)
- Unnotified errors
- Update availability (every 24h)
Architecture:
- Completely independent from Health Monitor's suppression system.
Suppression Duration only affects the UI health badge; it NEVER blocks
notifications.
- Reads ``get_active_errors()`` (ALL active errors, even suppressed ones)
and decides when to notify based on its own 24-hour cycle.
- For *new* errors (first_seen within the last poll interval), notifies
immediately.
- For *persistent* errors (already known), re-notifies once every 24 h.
- Update checks run on their own 24-h timer and include security counts.
Tracking is stored in ``notification_last_sent`` (same DB).
"""
def __init__(self, event_queue: Queue, poll_interval: int = 30):
DIGEST_INTERVAL = 86400 # 24 h between re-notifications
UPDATE_CHECK_INTERVAL = 86400 # 24 h between update scans
NEW_ERROR_WINDOW = 120 # seconds errors younger than this are "new"
_ENTITY_MAP = {
'cpu': ('node', ''), 'memory': ('node', ''), 'temperature': ('node', ''),
'disk': ('storage', ''), 'network': ('network', ''),
'pve_services': ('node', ''), 'security': ('user', ''),
'updates': ('node', ''), 'storage': ('storage', ''),
}
# Map health-persistence category names to our TEMPLATES event types.
# These must match keys in notification_templates.TEMPLATES exactly.
_CATEGORY_TO_EVENT_TYPE = {
'cpu': 'cpu_high',
'memory': 'ram_high',
'load': 'load_high',
'temperature': 'temp_high',
'disk': 'disk_space_low',
'storage': 'disk_space_low',
'network': 'network_down',
'pve_services': 'service_fail',
'security': 'auth_fail',
'updates': 'update_available',
'zfs': 'disk_io_error',
'smart': 'disk_io_error',
}
def __init__(self, event_queue: Queue, poll_interval: int = 60):
self._queue = event_queue
self._running = False
self._thread: Optional[threading.Thread] = None
self._poll_interval = poll_interval
self._hostname = _hostname()
self._last_update_check = 0
self._update_check_interval = 86400 # 24 hours
# In-memory cache: error_key -> last notification timestamp
self._last_notified: Dict[str, float] = {}
# Track known error keys so we can detect truly new ones
self._known_errors: set = set()
self._first_poll_done = False
def start(self):
if self._running:
return
self._running = True
self._load_last_notified()
self._thread = threading.Thread(target=self._poll_loop, daemon=True,
name='polling-collector')
self._thread.start()
@@ -642,92 +684,121 @@ class PollingCollector:
def stop(self):
self._running = False
# ── Main loop ──────────────────────────────────────────────
def _poll_loop(self):
"""Main polling loop."""
# Initial delay to let health monitor warm up
for _ in range(10):
for _ in range(15):
if not self._running:
return
time.sleep(1)
while self._running:
try:
self._collect_health_events()
self._check_persistent_health()
self._check_updates()
except Exception as e:
print(f"[PollingCollector] Error: {e}")
# Sleep in small increments for responsive shutdown
for _ in range(self._poll_interval):
if not self._running:
return
time.sleep(1)
def _collect_health_events(self):
"""Collect pending notification events from health_persistence."""
# ── Health errors (independent of suppression) ─────────────
def _check_persistent_health(self):
"""Read ALL active errors from health_persistence and decide
whether each one warrants a notification right now.
Rules:
- A *new* error (not in _known_errors) -> notify immediately
- A *persistent* error already notified -> re-notify after 24 h
- Uses its own tracking, NOT the health monitor's needs_notification flag
"""
try:
from health_persistence import health_persistence
# Get pending notification events
events = health_persistence.get_pending_notifications()
for evt in events:
data = json.loads(evt.get('data', '{}')) if isinstance(evt.get('data'), str) else evt.get('data', {})
event_type = evt.get('event_type', 'state_change')
severity = data.get('severity', 'WARNING')
data['hostname'] = self._hostname
data['error_key'] = evt.get('error_key', '')
# Deduce entity from health category
category = data.get('category', '')
entity_map = {
'cpu': ('node', ''), 'memory': ('node', ''),
'disk': ('storage', ''), 'network': ('network', ''),
'pve_services': ('node', ''), 'security': ('user', ''),
'updates': ('node', ''), 'storage': ('storage', ''),
}
entity, eid = entity_map.get(category, ('node', ''))
self._queue.put(NotificationEvent(
event_type, severity, data, source='health',
entity=entity, entity_id=eid or data.get('error_key', ''),
))
# Mark events as notified
if events:
event_ids = [e['id'] for e in events if 'id' in e]
if event_ids:
health_persistence.mark_events_notified(event_ids)
# Also check unnotified errors
unnotified = health_persistence.get_unnotified_errors()
for error in unnotified:
err_cat = error.get('category', '')
e_entity, e_eid = entity_map.get(err_cat, ('node', ''))
self._queue.put(NotificationEvent(
'new_error', error.get('severity', 'WARNING'), {
'category': err_cat,
'reason': error.get('reason', ''),
'hostname': self._hostname,
'error_key': error.get('error_key', ''),
},
source='health',
entity=e_entity,
entity_id=e_eid or error.get('error_key', ''),
))
# Mark as notified
if 'id' in error:
health_persistence.mark_notified(error['id'])
errors = health_persistence.get_active_errors()
except ImportError:
pass # health_persistence not available (CLI mode)
return
except Exception as e:
print(f"[PollingCollector] Health event collection error: {e}")
print(f"[PollingCollector] get_active_errors failed: {e}")
return
now = time.time()
current_keys = set()
for error in errors:
error_key = error.get('error_key', '')
if not error_key:
continue
current_keys.add(error_key)
category = error.get('category', '')
severity = error.get('severity', 'WARNING')
reason = error.get('reason', '')
# Determine if we should notify
is_new = error_key not in self._known_errors and self._first_poll_done
last_sent = self._last_notified.get(error_key, 0)
is_due = (now - last_sent) >= self.DIGEST_INTERVAL
if not is_new and not is_due:
continue
# Map to our event type
event_type = self._CATEGORY_TO_EVENT_TYPE.get(category, 'system_problem')
entity, eid = self._ENTITY_MAP.get(category, ('node', ''))
data = {
'hostname': self._hostname,
'category': category,
'reason': reason,
'error_key': error_key,
'severity': severity,
'first_seen': error.get('first_seen', ''),
'last_seen': error.get('last_seen', ''),
'is_persistent': not is_new,
}
# Include extra details if present
details = error.get('details')
if isinstance(details, dict):
data.update(details)
elif isinstance(details, str):
try:
data.update(json.loads(details))
except (json.JSONDecodeError, TypeError):
pass
self._queue.put(NotificationEvent(
event_type, severity, data, source='health',
entity=entity, entity_id=eid or error_key,
))
# Track that we notified
self._last_notified[error_key] = now
self._persist_last_notified(error_key, now)
# Remove tracking for errors that resolved
resolved = self._known_errors - current_keys
for key in resolved:
self._last_notified.pop(key, None)
self._known_errors = current_keys
self._first_poll_done = True
# ── Update check (enriched) ────────────────────────────────
def _check_updates(self):
"""Check for available system updates (every 24h)."""
"""Check for available system updates every 24 h.
Enriched output: total count, security updates, PVE version hint,
and top package names.
"""
now = time.time()
if now - self._last_update_check < self._update_check_interval:
if now - self._last_update_check < self.UPDATE_CHECK_INTERVAL:
return
self._last_update_check = now
@@ -735,33 +806,107 @@ class PollingCollector:
try:
result = subprocess.run(
['apt-get', '-s', 'upgrade'],
capture_output=True, text=True, timeout=60
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
return
if result.returncode == 0:
# Count upgradeable packages
lines = [l for l in result.stdout.split('\n')
if l.startswith('Inst ')]
count = len(lines)
lines = [l for l in result.stdout.split('\n') if l.startswith('Inst ')]
total = len(lines)
if total == 0:
return
if count > 0:
# Show first 5 package names
packages = [l.split()[1] for l in lines[:5]]
details = ', '.join(packages)
if count > 5:
details += f', ... and {count - 5} more'
packages = [l.split()[1] for l in lines]
security = [p for p in packages if any(
kw in p.lower() for kw in ('security', 'cve', 'openssl', 'libssl')
)]
self._queue.put(NotificationEvent(
'update_available', 'INFO', {
'count': str(count),
'details': details,
'hostname': self._hostname,
},
source='polling',
entity='node', entity_id='',
))
# Also detect security updates via apt changelog / Debian-Security origin
sec_result = subprocess.run(
['apt-get', '-s', 'upgrade', '-o', 'Dir::Etc::SourceList=/dev/null',
'-o', 'Dir::Etc::SourceParts=/dev/null'],
capture_output=True, text=True, timeout=30,
)
# Count lines from security repo (rough heuristic)
sec_count = max(len(security), 0)
try:
sec_output = subprocess.run(
['apt-get', '-s', '--only-upgrade', 'install'] + packages[:50],
capture_output=True, text=True, timeout=30,
)
for line in sec_output.stdout.split('\n'):
if 'security' in line.lower() and 'Inst ' in line:
sec_count += 1
except Exception:
pass
# Check for PVE version upgrade
pve_packages = [p for p in packages if 'pve-' in p.lower() or 'proxmox-' in p.lower()]
# Build display details
top_pkgs = packages[:8]
details = ', '.join(top_pkgs)
if total > 8:
details += f', ... +{total - 8} more'
data = {
'hostname': self._hostname,
'count': str(total),
'security_count': str(sec_count),
'details': details,
'packages': ', '.join(packages[:20]),
}
if pve_packages:
data['pve_packages'] = ', '.join(pve_packages)
self._queue.put(NotificationEvent(
'update_available', 'INFO', data,
source='polling', entity='node', entity_id='',
))
except Exception:
pass # Non-critical, silently skip
pass
# ── Persistence helpers ────────────────────────────────────
def _load_last_notified(self):
"""Load per-error notification timestamps from DB on startup."""
try:
db_path = Path('/usr/local/share/proxmenux/health_monitor.db')
if not db_path.exists():
return
conn = sqlite3.connect(str(db_path), timeout=10)
conn.execute('PRAGMA journal_mode=WAL')
cursor = conn.cursor()
cursor.execute(
"SELECT fingerprint, last_sent_ts FROM notification_last_sent "
"WHERE fingerprint LIKE 'health_%'"
)
for fp, ts in cursor.fetchall():
error_key = fp.replace('health_', '', 1)
self._last_notified[error_key] = ts
self._known_errors.add(error_key)
conn.close()
except Exception as e:
print(f"[PollingCollector] Failed to load last_notified: {e}")
def _persist_last_notified(self, error_key: str, ts: float):
"""Save per-error notification timestamp to DB."""
try:
db_path = Path('/usr/local/share/proxmenux/health_monitor.db')
conn = sqlite3.connect(str(db_path), timeout=10)
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA busy_timeout=5000')
fp = f'health_{error_key}'
conn.execute('''
INSERT OR REPLACE INTO notification_last_sent (fingerprint, last_sent_ts, count)
VALUES (?, ?, COALESCE(
(SELECT count + 1 FROM notification_last_sent WHERE fingerprint = ?), 1
))
''', (fp, int(ts), fp))
conn.commit()
conn.close()
except Exception:
pass
# ─── Proxmox Webhook Receiver ───────────────────────────────────
@@ -801,6 +946,11 @@ class ProxmoxHookWatcher:
event_type, entity, entity_id = self._classify(
notification_type, source_component, title, body, payload
)
# Discard meta-events (overall status changes, update status, etc.)
if event_type == '_skip':
return {'accepted': False, 'skipped': True, 'reason': 'Meta-event filtered'}
severity = self._map_severity(severity_raw)
data = {
@@ -830,11 +980,28 @@ class ProxmoxHookWatcher:
def _classify(self, ntype: str, component: str, title: str,
body: str, payload: dict) -> tuple:
"""Classify webhook payload into (event_type, entity, entity_id)."""
"""Classify webhook payload into (event_type, entity, entity_id).
Returns ('_skip', '', '') for meta-events we should discard.
"""
title_lower = (title or '').lower()
body_lower = (body or '').lower()
component_lower = (component or '').lower()
# ── Skip PVE meta-events ──
# PVE sends "overall status changed from OK to WARNING" which is a meta
# aggregation event. Our own health monitor handles the underlying issues
# with better granularity, so we skip these to avoid noise/duplicates.
if 'overall' in title_lower and ('changed' in title_lower or 'status' in title_lower):
return '_skip', '', ''
# ── Skip "updates changed" status events ──
# PVE sends "updates status changed from OK to WARNING" when apt updates
# are available. Our PollingCollector already handles update checks with
# proper detail (security count, package list) on a 24h cycle.
if 'updates' in title_lower and ('changed' in title_lower or 'status' in title_lower):
return '_skip', '', ''
# VM / CT lifecycle events (if sent via webhook)
vmid = str(payload.get('vmid', ''))
if any(k in component_lower for k in ('qemu', 'lxc', 'vm', 'ct', 'container')):
@@ -872,8 +1039,8 @@ class ProxmoxHookWatcher:
if 'replication' in component_lower or 'replication' in title_lower:
vmid = str(payload.get('vmid', ''))
if 'fail' in title_lower or 'error' in body_lower:
return 'vm_fail', 'vm', vmid
return 'migration_complete', 'vm', vmid
return 'replication_fail', 'vm', vmid
return 'replication_complete', 'vm', vmid
# PBS (Proxmox Backup Server)
if 'pbs' in component_lower or 'backup' in component_lower:
@@ -901,8 +1068,10 @@ class ProxmoxHookWatcher:
if 'network' in component_lower:
return 'network_down', 'network', ''
# Security
if any(k in component_lower for k in ('auth', 'firewall', 'security')):
# Security -- distinguish firewall from auth
if 'firewall' in component_lower or 'firewall' in title_lower:
return 'firewall_issue', 'node', ''
if any(k in component_lower for k in ('auth', 'security', 'pam', 'sshd')):
return 'auth_fail', 'user', ''
# Fallback: system_problem generic

View File

@@ -495,10 +495,23 @@ class NotificationManager:
self._dispatch_event(event)
def _process_event_direct(self, event: NotificationEvent):
"""Process a burst summary event. Bypasses aggregator but applies all other filters."""
"""Process a burst summary event. Bypasses aggregator but applies ALL other filters."""
if not self._enabled:
return
# Check group filter (same as _process_event)
template = TEMPLATES.get(event.event_type, {})
event_group = template.get('group', 'system')
group_setting = f'events.{event_group}'
if self._config.get(group_setting, 'true') == 'false':
return
# Check per-event filter (same as _process_event)
default_enabled = 'true' if template.get('default_enabled', True) else 'false'
event_specific = f'event.{event.event_type}'
if self._config.get(event_specific, default_enabled) == 'false':
return
# Check severity filter (same mapping as _process_event)
severity_map = {'all': 'INFO', 'warning': 'WARNING', 'critical': 'CRITICAL'}
raw_filter = self._config.get('severity_filter', 'all')

View File

@@ -45,11 +45,14 @@ SEVERITY_ICONS_DISCORD = {
TEMPLATES = {
# ── Health Monitor state changes ──
# NOTE: state_change is disabled by default -- it fires on every
# status oscillation (OK->WARNING->OK) which creates noise.
# The health_persistent and new_error templates cover this better.
'state_change': {
'title': '{hostname}: {category} changed to {current}',
'body': '{category} status changed from {previous} to {current}.\n{reason}',
'group': 'system',
'default_enabled': True,
'default_enabled': False,
},
'new_error': {
'title': '{hostname}: New {severity} - {category}',
@@ -137,6 +140,18 @@ TEMPLATES = {
'group': 'vm_ct',
'default_enabled': True,
},
'replication_fail': {
'title': '{hostname}: Replication FAILED - {vmid}',
'body': 'Replication of {vmname} ({vmid}) has failed.\n{reason}',
'group': 'vm_ct',
'default_enabled': True,
},
'replication_complete': {
'title': '{hostname}: Replication complete - {vmid}',
'body': 'Replication of {vmname} ({vmid}) completed successfully.',
'group': 'vm_ct',
'default_enabled': False,
},
# ── Backup / Snapshot events ──
'backup_start': {
@@ -314,6 +329,40 @@ TEMPLATES = {
'default_enabled': False,
},
# ── Persistent Health Issues (daily digest) ──
'health_persistent': {
'title': '{hostname}: {count} active health issue(s)',
'body': 'The following health issues remain active:\n{issue_list}\n\nThis digest is sent once every 24 hours while issues persist.',
'group': 'system',
'default_enabled': True,
},
'health_issue_new': {
'title': '{hostname}: New health issue - {category}',
'body': 'New {severity} issue detected:\n{reason}',
'group': 'system',
'default_enabled': True,
},
'health_issue_resolved': {
'title': '{hostname}: Resolved - {category}',
'body': '{category} issue has been resolved.\n{reason}\nDuration: {duration}',
'group': 'system',
'default_enabled': True,
},
# ── Update notifications (enriched) ──
'update_summary': {
'title': '{hostname}: {total_count} updates available',
'body': '{security_count} security update(s), {total_count} total.\n{package_list}',
'group': 'system',
'default_enabled': True,
},
'pve_update': {
'title': '{hostname}: PVE update available ({version})',
'body': 'Proxmox VE update available: {version}\n{details}',
'group': 'system',
'default_enabled': True,
},
# ── Burst aggregation summaries ──
'burst_auth_fail': {
'title': '{hostname}: {count} auth failures in {window}',
@@ -407,6 +456,9 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
'used': '', 'total': '', 'available': '', 'cores': '',
'count': '', 'size': '', 'snapshot_name': '', 'jail': '',
'failures': '', 'quorum': '', 'change_details': '', 'message': '',
'security_count': '0', 'total_count': '0', 'package_list': '',
'packages': '', 'pve_packages': '', 'version': '',
'issue_list': '', 'error_key': '',
}
variables.update(data)