2026-02-18 17:24:26 +01:00
|
|
|
|
"""
|
|
|
|
|
|
ProxMenux Notification Event Watchers
|
|
|
|
|
|
Detects Proxmox events from journald, PVE task log, and health monitor.
|
|
|
|
|
|
|
|
|
|
|
|
Architecture:
|
|
|
|
|
|
- JournalWatcher: Real-time stream of journald for critical events
|
|
|
|
|
|
- TaskWatcher: Real-time tail of /var/log/pve/tasks/index for VM/CT/backup events
|
|
|
|
|
|
- PollingCollector: Periodic poll of health_persistence pending notifications
|
|
|
|
|
|
|
|
|
|
|
|
All watchers put events into a shared Queue consumed by NotificationManager.
|
|
|
|
|
|
|
|
|
|
|
|
Author: MacRimi
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
import re
|
|
|
|
|
|
import json
|
|
|
|
|
|
import time
|
2026-02-19 17:02:02 +01:00
|
|
|
|
import hashlib
|
2026-02-18 17:24:26 +01:00
|
|
|
|
import socket
|
2026-02-21 17:23:03 +01:00
|
|
|
|
import sqlite3
|
2026-02-18 17:24:26 +01:00
|
|
|
|
import subprocess
|
|
|
|
|
|
import threading
|
|
|
|
|
|
from queue import Queue
|
2026-02-19 17:02:02 +01:00
|
|
|
|
from typing import Optional, Dict, Any, Tuple
|
2026-02-18 17:24:26 +01:00
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ─── Event Object ─────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
class NotificationEvent:
|
2026-02-19 17:02:02 +01:00
|
|
|
|
"""Represents a detected event ready for notification dispatch.
|
|
|
|
|
|
|
|
|
|
|
|
Fields:
|
|
|
|
|
|
event_type: Taxonomy key (e.g. 'vm_fail', 'auth_fail', 'split_brain')
|
|
|
|
|
|
severity: INFO | WARNING | CRITICAL
|
|
|
|
|
|
data: Payload dict with context (hostname, vmid, reason, etc.)
|
|
|
|
|
|
source: Origin: journal | tasks | health | proxmox_hook | cli | api | polling
|
|
|
|
|
|
entity: What is affected: node | vm | ct | storage | disk | network | cluster | user
|
|
|
|
|
|
entity_id: Specific identifier (vmid, IP, device, pool, interface, etc.)
|
|
|
|
|
|
raw: Original payload (webhook JSON or log line), optional
|
|
|
|
|
|
fingerprint: Stable dedup key: hostname:entity:entity_id:event_type
|
|
|
|
|
|
event_id: Short hash of fingerprint for correlation
|
|
|
|
|
|
ts_epoch: time.time() at creation
|
|
|
|
|
|
ts_monotonic: time.monotonic() at creation (drift-safe for cooldown)
|
|
|
|
|
|
"""
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
__slots__ = (
|
|
|
|
|
|
'event_type', 'severity', 'data', 'timestamp', 'source',
|
|
|
|
|
|
'entity', 'entity_id', 'raw',
|
|
|
|
|
|
'fingerprint', 'event_id', 'ts_epoch', 'ts_monotonic',
|
|
|
|
|
|
)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def __init__(self, event_type: str, severity: str = 'INFO',
|
|
|
|
|
|
data: Optional[Dict[str, Any]] = None,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
source: str = 'watcher',
|
|
|
|
|
|
entity: str = 'node', entity_id: str = '',
|
|
|
|
|
|
raw: Any = None):
|
2026-02-18 17:24:26 +01:00
|
|
|
|
self.event_type = event_type
|
|
|
|
|
|
self.severity = severity
|
|
|
|
|
|
self.data = data or {}
|
|
|
|
|
|
self.source = source
|
2026-02-19 17:02:02 +01:00
|
|
|
|
self.entity = entity
|
|
|
|
|
|
self.entity_id = entity_id
|
|
|
|
|
|
self.raw = raw
|
|
|
|
|
|
self.ts_epoch = time.time()
|
|
|
|
|
|
self.ts_monotonic = time.monotonic()
|
|
|
|
|
|
self.timestamp = self.ts_epoch # backward compat
|
|
|
|
|
|
|
|
|
|
|
|
# Build fingerprint for dedup/cooldown
|
|
|
|
|
|
hostname = self.data.get('hostname', _hostname())
|
|
|
|
|
|
if entity_id:
|
|
|
|
|
|
fp_base = f"{hostname}:{entity}:{entity_id}:{event_type}"
|
|
|
|
|
|
else:
|
|
|
|
|
|
# When entity_id is empty, include a hash of title/body for uniqueness
|
|
|
|
|
|
reason = self.data.get('reason', self.data.get('title', ''))
|
|
|
|
|
|
stable_extra = hashlib.md5(reason.encode(errors='replace')).hexdigest()[:8] if reason else ''
|
|
|
|
|
|
fp_base = f"{hostname}:{entity}:{event_type}:{stable_extra}"
|
|
|
|
|
|
self.fingerprint = fp_base
|
|
|
|
|
|
self.event_id = hashlib.md5(fp_base.encode()).hexdigest()[:12]
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
2026-02-19 17:02:02 +01:00
|
|
|
|
return f"NotificationEvent({self.event_type}, {self.severity}, fp={self.fingerprint[:40]})"
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _hostname() -> str:
|
|
|
|
|
|
try:
|
|
|
|
|
|
return socket.gethostname().split('.')[0]
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return 'proxmox'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ─── Journal Watcher (Real-time) ─────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
class JournalWatcher:
|
|
|
|
|
|
"""Watches journald in real-time for critical system events.
|
|
|
|
|
|
|
|
|
|
|
|
Uses 'journalctl -f -o json' subprocess to stream entries.
|
|
|
|
|
|
Detects: auth failures, kernel panics, OOM, service crashes,
|
|
|
|
|
|
disk I/O errors, split-brain, node disconnect, system shutdown,
|
|
|
|
|
|
fail2ban bans, firewall blocks, permission changes.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, event_queue: Queue):
|
|
|
|
|
|
self._queue = event_queue
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
|
|
self._process: Optional[subprocess.Popen] = None
|
|
|
|
|
|
self._hostname = _hostname()
|
|
|
|
|
|
|
|
|
|
|
|
# Dedup: track recent events to avoid duplicates
|
|
|
|
|
|
self._recent_events: Dict[str, float] = {}
|
|
|
|
|
|
self._dedup_window = 30 # seconds
|
|
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
|
"""Start the journal watcher thread."""
|
|
|
|
|
|
if self._running:
|
|
|
|
|
|
return
|
|
|
|
|
|
self._running = True
|
|
|
|
|
|
self._thread = threading.Thread(target=self._watch_loop, daemon=True,
|
|
|
|
|
|
name='journal-watcher')
|
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
|
"""Stop the journal watcher."""
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
if self._process:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._process.terminate()
|
|
|
|
|
|
self._process.wait(timeout=5)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._process.kill()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def _watch_loop(self):
|
|
|
|
|
|
"""Main watch loop with auto-restart on failure."""
|
|
|
|
|
|
while self._running:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._run_journalctl()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[JournalWatcher] Error: {e}")
|
|
|
|
|
|
if self._running:
|
|
|
|
|
|
time.sleep(5) # Wait before restart
|
|
|
|
|
|
|
|
|
|
|
|
def _run_journalctl(self):
|
|
|
|
|
|
"""Run journalctl -f and process output line by line."""
|
|
|
|
|
|
cmd = ['journalctl', '-f', '-o', 'json', '--no-pager',
|
|
|
|
|
|
'-n', '0'] # Start from now, don't replay history
|
|
|
|
|
|
|
|
|
|
|
|
self._process = subprocess.Popen(
|
|
|
|
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
|
|
|
|
|
text=True, bufsize=1
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
for line in self._process.stdout:
|
|
|
|
|
|
if not self._running:
|
|
|
|
|
|
break
|
|
|
|
|
|
line = line.strip()
|
|
|
|
|
|
if not line:
|
|
|
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
entry = json.loads(line)
|
|
|
|
|
|
self._process_entry(entry)
|
|
|
|
|
|
except (json.JSONDecodeError, KeyError):
|
|
|
|
|
|
# Try plain text matching as fallback
|
|
|
|
|
|
self._process_plain(line)
|
|
|
|
|
|
|
|
|
|
|
|
if self._process:
|
|
|
|
|
|
self._process.wait()
|
|
|
|
|
|
|
|
|
|
|
|
def _process_entry(self, entry: Dict):
|
|
|
|
|
|
"""Process a parsed journald JSON entry."""
|
|
|
|
|
|
msg = entry.get('MESSAGE', '')
|
|
|
|
|
|
if not msg or not isinstance(msg, str):
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
unit = entry.get('_SYSTEMD_UNIT', '')
|
|
|
|
|
|
syslog_id = entry.get('SYSLOG_IDENTIFIER', '')
|
|
|
|
|
|
priority = int(entry.get('PRIORITY', 6))
|
|
|
|
|
|
|
|
|
|
|
|
self._check_auth_failure(msg, syslog_id, entry)
|
|
|
|
|
|
self._check_fail2ban(msg, syslog_id)
|
|
|
|
|
|
self._check_kernel_critical(msg, syslog_id, priority)
|
|
|
|
|
|
self._check_service_failure(msg, unit)
|
|
|
|
|
|
self._check_disk_io(msg, syslog_id, priority)
|
|
|
|
|
|
self._check_cluster_events(msg, syslog_id)
|
|
|
|
|
|
self._check_system_shutdown(msg, syslog_id)
|
|
|
|
|
|
self._check_permission_change(msg, syslog_id)
|
|
|
|
|
|
self._check_firewall(msg, syslog_id)
|
|
|
|
|
|
|
|
|
|
|
|
def _process_plain(self, line: str):
|
|
|
|
|
|
"""Fallback: process a plain text log line."""
|
|
|
|
|
|
self._check_auth_failure(line, '', {})
|
|
|
|
|
|
self._check_fail2ban(line, '')
|
|
|
|
|
|
self._check_kernel_critical(line, '', 6)
|
|
|
|
|
|
self._check_cluster_events(line, '')
|
|
|
|
|
|
self._check_system_shutdown(line, '')
|
|
|
|
|
|
|
|
|
|
|
|
# ── Detection methods ──
|
|
|
|
|
|
|
|
|
|
|
|
def _check_auth_failure(self, msg: str, syslog_id: str, entry: Dict):
|
|
|
|
|
|
"""Detect authentication failures (SSH, PAM, PVE)."""
|
|
|
|
|
|
patterns = [
|
|
|
|
|
|
(r'Failed password for (?:invalid user )?(\S+) from (\S+)', 'ssh'),
|
|
|
|
|
|
(r'authentication failure.*rhost=(\S+).*user=(\S+)', 'pam'),
|
|
|
|
|
|
(r'pvedaemon\[.*authentication failure.*rhost=(\S+)', 'pve'),
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for pattern, service in patterns:
|
|
|
|
|
|
match = re.search(pattern, msg, re.IGNORECASE)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
groups = match.groups()
|
|
|
|
|
|
if service == 'ssh':
|
|
|
|
|
|
username, source_ip = groups[0], groups[1]
|
|
|
|
|
|
elif service == 'pam':
|
|
|
|
|
|
source_ip, username = groups[0], groups[1]
|
|
|
|
|
|
else:
|
|
|
|
|
|
source_ip = groups[0]
|
|
|
|
|
|
username = 'unknown'
|
|
|
|
|
|
|
|
|
|
|
|
self._emit('auth_fail', 'WARNING', {
|
|
|
|
|
|
'source_ip': source_ip,
|
|
|
|
|
|
'username': username,
|
|
|
|
|
|
'service': service,
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='user', entity_id=source_ip)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def _check_fail2ban(self, msg: str, syslog_id: str):
|
|
|
|
|
|
"""Detect Fail2Ban IP bans."""
|
|
|
|
|
|
if 'fail2ban' not in msg.lower() and syslog_id != 'fail2ban-server':
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Ban detected
|
|
|
|
|
|
ban_match = re.search(r'Ban\s+(\S+)', msg)
|
|
|
|
|
|
if ban_match:
|
|
|
|
|
|
ip = ban_match.group(1)
|
|
|
|
|
|
jail_match = re.search(r'\[(\w+)\]', msg)
|
|
|
|
|
|
jail = jail_match.group(1) if jail_match else 'unknown'
|
|
|
|
|
|
|
|
|
|
|
|
self._emit('ip_block', 'INFO', {
|
|
|
|
|
|
'source_ip': ip,
|
|
|
|
|
|
'jail': jail,
|
|
|
|
|
|
'failures': '',
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='user', entity_id=ip)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def _check_kernel_critical(self, msg: str, syslog_id: str, priority: int):
|
|
|
|
|
|
"""Detect kernel panics, OOM, segfaults, hardware errors."""
|
|
|
|
|
|
critical_patterns = {
|
|
|
|
|
|
r'kernel panic': ('system_problem', 'CRITICAL', 'Kernel panic'),
|
|
|
|
|
|
r'Out of memory': ('system_problem', 'CRITICAL', 'Out of memory killer activated'),
|
|
|
|
|
|
r'segfault': ('system_problem', 'WARNING', 'Segmentation fault detected'),
|
|
|
|
|
|
r'BUG:': ('system_problem', 'CRITICAL', 'Kernel BUG detected'),
|
|
|
|
|
|
r'Call Trace:': ('system_problem', 'WARNING', 'Kernel call trace'),
|
|
|
|
|
|
r'I/O error.*dev\s+(\S+)': ('disk_io_error', 'CRITICAL', 'Disk I/O error'),
|
|
|
|
|
|
r'EXT4-fs error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
|
|
|
|
|
r'BTRFS error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
|
|
|
|
|
r'XFS.*error': ('disk_io_error', 'CRITICAL', 'Filesystem error'),
|
|
|
|
|
|
r'ZFS.*error': ('disk_io_error', 'CRITICAL', 'ZFS pool error'),
|
|
|
|
|
|
r'mce:.*Hardware Error': ('system_problem', 'CRITICAL', 'Hardware error (MCE)'),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for pattern, (event_type, severity, reason) in critical_patterns.items():
|
|
|
|
|
|
if re.search(pattern, msg, re.IGNORECASE):
|
|
|
|
|
|
data = {'reason': reason, 'hostname': self._hostname}
|
2026-02-19 17:02:02 +01:00
|
|
|
|
entity = 'node'
|
|
|
|
|
|
entity_id = ''
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
# Try to extract device for disk errors
|
|
|
|
|
|
dev_match = re.search(r'dev\s+(\S+)', msg)
|
|
|
|
|
|
if dev_match and event_type == 'disk_io_error':
|
|
|
|
|
|
data['device'] = dev_match.group(1)
|
2026-02-19 17:02:02 +01:00
|
|
|
|
entity = 'disk'
|
|
|
|
|
|
entity_id = dev_match.group(1)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
self._emit(event_type, severity, data, entity=entity, entity_id=entity_id)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def _check_service_failure(self, msg: str, unit: str):
|
|
|
|
|
|
"""Detect critical service failures."""
|
|
|
|
|
|
service_patterns = [
|
|
|
|
|
|
r'Failed to start (.+)',
|
|
|
|
|
|
r'Unit (\S+) (?:entered failed state|failed)',
|
|
|
|
|
|
r'(\S+)\.service: (?:Main process exited|Failed with result)',
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for pattern in service_patterns:
|
|
|
|
|
|
match = re.search(pattern, msg)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
service_name = match.group(1)
|
|
|
|
|
|
self._emit('service_fail', 'WARNING', {
|
|
|
|
|
|
'service_name': service_name,
|
|
|
|
|
|
'reason': msg[:200],
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='node', entity_id=service_name)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def _check_disk_io(self, msg: str, syslog_id: str, priority: int):
|
|
|
|
|
|
"""Detect disk I/O errors from kernel messages."""
|
|
|
|
|
|
if syslog_id != 'kernel' and priority > 3:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
io_patterns = [
|
|
|
|
|
|
r'blk_update_request: I/O error.*dev (\S+)',
|
|
|
|
|
|
r'Buffer I/O error on device (\S+)',
|
|
|
|
|
|
r'SCSI error.*sd(\w)',
|
|
|
|
|
|
r'ata\d+.*error',
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for pattern in io_patterns:
|
|
|
|
|
|
match = re.search(pattern, msg)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
device = match.group(1) if match.lastindex else 'unknown'
|
|
|
|
|
|
self._emit('disk_io_error', 'CRITICAL', {
|
|
|
|
|
|
'device': device,
|
|
|
|
|
|
'reason': msg[:200],
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='disk', entity_id=device)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def _check_cluster_events(self, msg: str, syslog_id: str):
|
|
|
|
|
|
"""Detect cluster split-brain and node disconnect."""
|
|
|
|
|
|
msg_lower = msg.lower()
|
|
|
|
|
|
|
|
|
|
|
|
# Split-brain
|
|
|
|
|
|
if any(p in msg_lower for p in ['split-brain', 'split brain',
|
|
|
|
|
|
'fencing required', 'cluster partition']):
|
|
|
|
|
|
quorum = 'unknown'
|
|
|
|
|
|
if 'quorum' in msg_lower:
|
|
|
|
|
|
quorum = 'lost' if 'lost' in msg_lower else 'valid'
|
|
|
|
|
|
|
|
|
|
|
|
self._emit('split_brain', 'CRITICAL', {
|
|
|
|
|
|
'quorum': quorum,
|
|
|
|
|
|
'reason': msg[:200],
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='cluster', entity_id=self._hostname)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Node disconnect
|
|
|
|
|
|
if (('quorum' in msg_lower and 'lost' in msg_lower) or
|
|
|
|
|
|
('node' in msg_lower and any(w in msg_lower for w in ['left', 'offline', 'lost']))):
|
|
|
|
|
|
|
|
|
|
|
|
node_match = re.search(r'[Nn]ode\s+(\S+)', msg)
|
|
|
|
|
|
node_name = node_match.group(1) if node_match else 'unknown'
|
|
|
|
|
|
|
|
|
|
|
|
self._emit('node_disconnect', 'CRITICAL', {
|
|
|
|
|
|
'node_name': node_name,
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='cluster', entity_id=node_name)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def _check_system_shutdown(self, msg: str, syslog_id: str):
|
|
|
|
|
|
"""Detect system shutdown/reboot."""
|
|
|
|
|
|
if 'systemd-journald' in syslog_id or 'systemd' in syslog_id:
|
|
|
|
|
|
if 'Journal stopped' in msg or 'Stopping Journal Service' in msg:
|
|
|
|
|
|
self._emit('system_shutdown', 'WARNING', {
|
|
|
|
|
|
'reason': 'System journal stopped',
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='node', entity_id='')
|
2026-02-18 17:24:26 +01:00
|
|
|
|
elif 'Shutting down' in msg or 'System is rebooting' in msg:
|
|
|
|
|
|
event = 'system_reboot' if 'reboot' in msg.lower() else 'system_shutdown'
|
|
|
|
|
|
self._emit(event, 'WARNING', {
|
|
|
|
|
|
'reason': msg[:200],
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='node', entity_id='')
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def _check_permission_change(self, msg: str, syslog_id: str):
|
|
|
|
|
|
"""Detect user permission changes in PVE."""
|
|
|
|
|
|
permission_patterns = [
|
|
|
|
|
|
(r'set permissions.*user\s+(\S+)', 'Permission changed'),
|
|
|
|
|
|
(r'user added to group.*?(\S+)', 'Added to group'),
|
|
|
|
|
|
(r'user removed from group.*?(\S+)', 'Removed from group'),
|
|
|
|
|
|
(r'ACL updated.*?(\S+)', 'ACL updated'),
|
|
|
|
|
|
(r'Role assigned.*?(\S+)', 'Role assigned'),
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for pattern, action in permission_patterns:
|
|
|
|
|
|
match = re.search(pattern, msg, re.IGNORECASE)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
username = match.group(1)
|
|
|
|
|
|
self._emit('user_permission_change', 'INFO', {
|
|
|
|
|
|
'username': username,
|
|
|
|
|
|
'change_details': action,
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='user', entity_id=username)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def _check_firewall(self, msg: str, syslog_id: str):
|
|
|
|
|
|
"""Detect firewall issues (not individual drops, but rule errors)."""
|
|
|
|
|
|
if re.search(r'pve-firewall.*(?:error|failed|unable)', msg, re.IGNORECASE):
|
|
|
|
|
|
self._emit('firewall_issue', 'WARNING', {
|
|
|
|
|
|
'reason': msg[:200],
|
|
|
|
|
|
'hostname': self._hostname,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
}, entity='network', entity_id='')
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
# ── Emit helper ──
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
def _emit(self, event_type: str, severity: str, data: Dict,
|
|
|
|
|
|
entity: str = 'node', entity_id: str = ''):
|
|
|
|
|
|
"""Emit event to queue with short-term deduplication (30s window)."""
|
|
|
|
|
|
event = NotificationEvent(
|
|
|
|
|
|
event_type, severity, data, source='journal',
|
|
|
|
|
|
entity=entity, entity_id=entity_id,
|
|
|
|
|
|
)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
2026-02-19 17:02:02 +01:00
|
|
|
|
last = self._recent_events.get(event.fingerprint, 0)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
if now - last < self._dedup_window:
|
2026-02-19 17:02:02 +01:00
|
|
|
|
return # Skip duplicate within 30s window
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
self._recent_events[event.fingerprint] = now
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
# Cleanup old dedup entries periodically
|
|
|
|
|
|
if len(self._recent_events) > 200:
|
|
|
|
|
|
cutoff = now - self._dedup_window * 2
|
|
|
|
|
|
self._recent_events = {
|
|
|
|
|
|
k: v for k, v in self._recent_events.items() if v > cutoff
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
self._queue.put(event)
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ─── Task Watcher (Real-time) ────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
class TaskWatcher:
|
|
|
|
|
|
"""Watches /var/log/pve/tasks/index for VM/CT and backup events.
|
|
|
|
|
|
|
|
|
|
|
|
The PVE task index file is appended when tasks start/finish.
|
|
|
|
|
|
Format: UPID:node:pid:pstart:starttime:type:id:user:
|
|
|
|
|
|
Final status is recorded when task completes.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
TASK_LOG = '/var/log/pve/tasks/index'
|
|
|
|
|
|
|
|
|
|
|
|
# Map PVE task types to our event types
|
|
|
|
|
|
TASK_MAP = {
|
|
|
|
|
|
'qmstart': ('vm_start', 'INFO'),
|
|
|
|
|
|
'qmstop': ('vm_stop', 'INFO'),
|
|
|
|
|
|
'qmshutdown': ('vm_shutdown', 'INFO'),
|
|
|
|
|
|
'qmreboot': ('vm_restart', 'INFO'),
|
|
|
|
|
|
'qmreset': ('vm_restart', 'INFO'),
|
|
|
|
|
|
'vzstart': ('ct_start', 'INFO'),
|
|
|
|
|
|
'vzstop': ('ct_stop', 'INFO'),
|
|
|
|
|
|
'vzshutdown': ('ct_stop', 'INFO'),
|
|
|
|
|
|
'vzdump': ('backup_start', 'INFO'),
|
|
|
|
|
|
'qmsnapshot': ('snapshot_complete', 'INFO'),
|
|
|
|
|
|
'vzsnapshot': ('snapshot_complete', 'INFO'),
|
|
|
|
|
|
'qmigrate': ('migration_start', 'INFO'),
|
|
|
|
|
|
'vzmigrate': ('migration_start', 'INFO'),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, event_queue: Queue):
|
|
|
|
|
|
self._queue = event_queue
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
|
|
self._hostname = _hostname()
|
|
|
|
|
|
self._last_position = 0
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# Set by NotificationManager to point at ProxmoxHookWatcher._delivered
|
|
|
|
|
|
# so we can skip events the webhook already delivered with richer data.
|
|
|
|
|
|
self._webhook_delivered: Optional[dict] = None
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
|
if self._running:
|
|
|
|
|
|
return
|
|
|
|
|
|
self._running = True
|
|
|
|
|
|
|
|
|
|
|
|
# Start at end of file
|
|
|
|
|
|
if os.path.exists(self.TASK_LOG):
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._last_position = os.path.getsize(self.TASK_LOG)
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
self._last_position = 0
|
|
|
|
|
|
|
|
|
|
|
|
self._thread = threading.Thread(target=self._watch_loop, daemon=True,
|
|
|
|
|
|
name='task-watcher')
|
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
|
|
|
|
|
def _watch_loop(self):
|
|
|
|
|
|
"""Poll the task index file for new entries."""
|
|
|
|
|
|
while self._running:
|
|
|
|
|
|
try:
|
|
|
|
|
|
if os.path.exists(self.TASK_LOG):
|
|
|
|
|
|
current_size = os.path.getsize(self.TASK_LOG)
|
|
|
|
|
|
|
|
|
|
|
|
if current_size < self._last_position:
|
|
|
|
|
|
# File was truncated/rotated
|
|
|
|
|
|
self._last_position = 0
|
|
|
|
|
|
|
|
|
|
|
|
if current_size > self._last_position:
|
|
|
|
|
|
with open(self.TASK_LOG, 'r') as f:
|
|
|
|
|
|
f.seek(self._last_position)
|
|
|
|
|
|
new_lines = f.readlines()
|
|
|
|
|
|
self._last_position = f.tell()
|
|
|
|
|
|
|
|
|
|
|
|
for line in new_lines:
|
|
|
|
|
|
self._process_task_line(line.strip())
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[TaskWatcher] Error reading task log: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
time.sleep(2) # Check every 2 seconds
|
|
|
|
|
|
|
|
|
|
|
|
def _process_task_line(self, line: str):
|
|
|
|
|
|
"""Process a single task index line.
|
|
|
|
|
|
|
|
|
|
|
|
PVE task index format (space-separated):
|
|
|
|
|
|
UPID endtime status
|
|
|
|
|
|
Where UPID = UPID:node:pid:pstart:starttime:type:id:user:
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not line:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
parts = line.split()
|
|
|
|
|
|
if not parts:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
upid = parts[0]
|
|
|
|
|
|
status = parts[2] if len(parts) >= 3 else ''
|
|
|
|
|
|
|
|
|
|
|
|
# Parse UPID
|
|
|
|
|
|
upid_parts = upid.split(':')
|
|
|
|
|
|
if len(upid_parts) < 8:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
task_type = upid_parts[5]
|
|
|
|
|
|
vmid = upid_parts[6]
|
|
|
|
|
|
user = upid_parts[7]
|
|
|
|
|
|
|
|
|
|
|
|
# Get VM/CT name
|
|
|
|
|
|
vmname = self._get_vm_name(vmid) if vmid else ''
|
|
|
|
|
|
|
|
|
|
|
|
# Map to event type
|
|
|
|
|
|
event_info = self.TASK_MAP.get(task_type)
|
|
|
|
|
|
if not event_info:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
event_type, default_severity = event_info
|
|
|
|
|
|
|
|
|
|
|
|
# Check if task failed
|
|
|
|
|
|
is_error = status and status != 'OK' and status != ''
|
|
|
|
|
|
|
|
|
|
|
|
if is_error:
|
|
|
|
|
|
# Override to failure event
|
|
|
|
|
|
if 'start' in event_type:
|
|
|
|
|
|
event_type = event_type.replace('_start', '_fail')
|
|
|
|
|
|
elif 'complete' in event_type:
|
|
|
|
|
|
event_type = event_type.replace('_complete', '_fail')
|
|
|
|
|
|
severity = 'CRITICAL'
|
|
|
|
|
|
elif status == 'OK':
|
|
|
|
|
|
# Task completed successfully
|
|
|
|
|
|
if event_type == 'backup_start':
|
|
|
|
|
|
event_type = 'backup_complete'
|
|
|
|
|
|
elif event_type == 'migration_start':
|
|
|
|
|
|
event_type = 'migration_complete'
|
|
|
|
|
|
severity = 'INFO'
|
|
|
|
|
|
else:
|
|
|
|
|
|
# Task just started (no status yet)
|
|
|
|
|
|
severity = default_severity
|
|
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'vmid': vmid,
|
|
|
|
|
|
'vmname': vmname or f'ID {vmid}',
|
|
|
|
|
|
'hostname': self._hostname,
|
|
|
|
|
|
'user': user,
|
|
|
|
|
|
'reason': status if is_error else '',
|
|
|
|
|
|
'target_node': '',
|
|
|
|
|
|
'size': '',
|
|
|
|
|
|
'snapshot_name': '',
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
# Determine entity type from task type
|
|
|
|
|
|
entity = 'ct' if task_type.startswith('vz') else 'vm'
|
2026-02-24 17:55:03 +01:00
|
|
|
|
|
|
|
|
|
|
# ── Cross-source dedup: yield to PVE webhook for backup/replication ──
|
|
|
|
|
|
# The webhook delivers richer data (full logs, sizes, durations).
|
|
|
|
|
|
# If the webhook already delivered this event within 120s, skip.
|
2026-02-24 19:27:43 +01:00
|
|
|
|
# For backup events, PVE sends ONE webhook for the entire vzdump job
|
|
|
|
|
|
# (covering all VMs), while TaskWatcher sees individual per-VM tasks.
|
|
|
|
|
|
# So we check by event_type ONLY (no VMID) -- if ANY backup_complete
|
|
|
|
|
|
# arrived from webhook recently, skip ALL backup_complete from tasks.
|
2026-02-24 17:55:03 +01:00
|
|
|
|
_WEBHOOK_TYPES = {'backup_complete', 'backup_fail', 'backup_start',
|
|
|
|
|
|
'replication_complete', 'replication_fail'}
|
|
|
|
|
|
if event_type in _WEBHOOK_TYPES and self._webhook_delivered:
|
|
|
|
|
|
import time as _time
|
2026-02-24 19:27:43 +01:00
|
|
|
|
# Check type-only key first (covers multi-VM jobs)
|
|
|
|
|
|
type_key = f"{event_type}:"
|
|
|
|
|
|
for dkey, dtime in self._webhook_delivered.items():
|
|
|
|
|
|
if dkey.startswith(type_key) and (_time.time() - dtime) < 120:
|
|
|
|
|
|
return # Webhook already delivered this with richer data
|
2026-02-24 17:55:03 +01:00
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
self._queue.put(NotificationEvent(
|
|
|
|
|
|
event_type, severity, data, source='tasks',
|
|
|
|
|
|
entity=entity, entity_id=vmid,
|
|
|
|
|
|
))
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def _get_vm_name(self, vmid: str) -> str:
|
|
|
|
|
|
"""Try to resolve VMID to name via config files."""
|
|
|
|
|
|
if not vmid:
|
|
|
|
|
|
return ''
|
|
|
|
|
|
|
|
|
|
|
|
# Try QEMU
|
|
|
|
|
|
conf_path = f'/etc/pve/qemu-server/{vmid}.conf'
|
|
|
|
|
|
name = self._read_name_from_conf(conf_path)
|
|
|
|
|
|
if name:
|
|
|
|
|
|
return name
|
|
|
|
|
|
|
|
|
|
|
|
# Try LXC
|
|
|
|
|
|
conf_path = f'/etc/pve/lxc/{vmid}.conf'
|
|
|
|
|
|
name = self._read_name_from_conf(conf_path)
|
|
|
|
|
|
if name:
|
|
|
|
|
|
return name
|
|
|
|
|
|
|
|
|
|
|
|
return ''
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _read_name_from_conf(path: str) -> str:
|
|
|
|
|
|
"""Read 'name:' or 'hostname:' from PVE config file."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not os.path.exists(path):
|
|
|
|
|
|
return ''
|
|
|
|
|
|
with open(path, 'r') as f:
|
|
|
|
|
|
for line in f:
|
|
|
|
|
|
if line.startswith('name:'):
|
|
|
|
|
|
return line.split(':', 1)[1].strip()
|
|
|
|
|
|
if line.startswith('hostname:'):
|
|
|
|
|
|
return line.split(':', 1)[1].strip()
|
|
|
|
|
|
except (IOError, PermissionError):
|
|
|
|
|
|
pass
|
|
|
|
|
|
return ''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ─── Polling Collector ────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
class PollingCollector:
|
2026-02-21 17:23:03 +01:00
|
|
|
|
"""Periodic collector that polls health state independently.
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
Architecture:
|
|
|
|
|
|
- Completely independent from Health Monitor's suppression system.
|
|
|
|
|
|
Suppression Duration only affects the UI health badge; it NEVER blocks
|
|
|
|
|
|
notifications.
|
|
|
|
|
|
- Reads ``get_active_errors()`` (ALL active errors, even suppressed ones)
|
|
|
|
|
|
and decides when to notify based on its own 24-hour cycle.
|
|
|
|
|
|
- For *new* errors (first_seen within the last poll interval), notifies
|
|
|
|
|
|
immediately.
|
|
|
|
|
|
- For *persistent* errors (already known), re-notifies once every 24 h.
|
|
|
|
|
|
- Update checks run on their own 24-h timer and include security counts.
|
|
|
|
|
|
|
|
|
|
|
|
Tracking is stored in ``notification_last_sent`` (same DB).
|
2026-02-18 17:24:26 +01:00
|
|
|
|
"""
|
|
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
DIGEST_INTERVAL = 86400 # 24 h between re-notifications
|
|
|
|
|
|
UPDATE_CHECK_INTERVAL = 86400 # 24 h between update scans
|
|
|
|
|
|
NEW_ERROR_WINDOW = 120 # seconds – errors younger than this are "new"
|
|
|
|
|
|
|
|
|
|
|
|
_ENTITY_MAP = {
|
|
|
|
|
|
'cpu': ('node', ''), 'memory': ('node', ''), 'temperature': ('node', ''),
|
|
|
|
|
|
'disk': ('storage', ''), 'network': ('network', ''),
|
|
|
|
|
|
'pve_services': ('node', ''), 'security': ('user', ''),
|
|
|
|
|
|
'updates': ('node', ''), 'storage': ('storage', ''),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Map health-persistence category names to our TEMPLATES event types.
|
|
|
|
|
|
# These must match keys in notification_templates.TEMPLATES exactly.
|
|
|
|
|
|
_CATEGORY_TO_EVENT_TYPE = {
|
|
|
|
|
|
'cpu': 'cpu_high',
|
|
|
|
|
|
'memory': 'ram_high',
|
|
|
|
|
|
'load': 'load_high',
|
|
|
|
|
|
'temperature': 'temp_high',
|
|
|
|
|
|
'disk': 'disk_space_low',
|
|
|
|
|
|
'storage': 'disk_space_low',
|
|
|
|
|
|
'network': 'network_down',
|
|
|
|
|
|
'pve_services': 'service_fail',
|
|
|
|
|
|
'security': 'auth_fail',
|
|
|
|
|
|
'updates': 'update_available',
|
|
|
|
|
|
'zfs': 'disk_io_error',
|
|
|
|
|
|
'smart': 'disk_io_error',
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, event_queue: Queue, poll_interval: int = 60):
|
2026-02-18 17:24:26 +01:00
|
|
|
|
self._queue = event_queue
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
|
|
self._poll_interval = poll_interval
|
|
|
|
|
|
self._hostname = _hostname()
|
|
|
|
|
|
self._last_update_check = 0
|
2026-02-21 17:23:03 +01:00
|
|
|
|
# In-memory cache: error_key -> last notification timestamp
|
|
|
|
|
|
self._last_notified: Dict[str, float] = {}
|
|
|
|
|
|
# Track known error keys so we can detect truly new ones
|
|
|
|
|
|
self._known_errors: set = set()
|
|
|
|
|
|
self._first_poll_done = False
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
|
if self._running:
|
|
|
|
|
|
return
|
|
|
|
|
|
self._running = True
|
2026-02-21 17:23:03 +01:00
|
|
|
|
self._load_last_notified()
|
2026-02-18 17:24:26 +01:00
|
|
|
|
self._thread = threading.Thread(target=self._poll_loop, daemon=True,
|
|
|
|
|
|
name='polling-collector')
|
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
# ── Main loop ──────────────────────────────────────────────
|
|
|
|
|
|
|
2026-02-18 17:24:26 +01:00
|
|
|
|
def _poll_loop(self):
|
|
|
|
|
|
"""Main polling loop."""
|
|
|
|
|
|
# Initial delay to let health monitor warm up
|
2026-02-21 17:23:03 +01:00
|
|
|
|
for _ in range(15):
|
2026-02-18 17:24:26 +01:00
|
|
|
|
if not self._running:
|
|
|
|
|
|
return
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
|
|
|
while self._running:
|
|
|
|
|
|
try:
|
2026-02-21 17:23:03 +01:00
|
|
|
|
self._check_persistent_health()
|
2026-02-18 17:24:26 +01:00
|
|
|
|
self._check_updates()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PollingCollector] Error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
for _ in range(self._poll_interval):
|
|
|
|
|
|
if not self._running:
|
|
|
|
|
|
return
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
# ── Health errors (independent of suppression) ─────────────
|
|
|
|
|
|
|
|
|
|
|
|
def _check_persistent_health(self):
|
|
|
|
|
|
"""Read ALL active errors from health_persistence and decide
|
|
|
|
|
|
whether each one warrants a notification right now.
|
|
|
|
|
|
|
|
|
|
|
|
Rules:
|
|
|
|
|
|
- A *new* error (not in _known_errors) -> notify immediately
|
|
|
|
|
|
- A *persistent* error already notified -> re-notify after 24 h
|
|
|
|
|
|
- Uses its own tracking, NOT the health monitor's needs_notification flag
|
|
|
|
|
|
"""
|
2026-02-18 17:24:26 +01:00
|
|
|
|
try:
|
|
|
|
|
|
from health_persistence import health_persistence
|
2026-02-21 17:23:03 +01:00
|
|
|
|
errors = health_persistence.get_active_errors()
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
return
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PollingCollector] get_active_errors failed: {e}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
|
current_keys = set()
|
|
|
|
|
|
|
|
|
|
|
|
for error in errors:
|
|
|
|
|
|
error_key = error.get('error_key', '')
|
|
|
|
|
|
if not error_key:
|
|
|
|
|
|
continue
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
current_keys.add(error_key)
|
|
|
|
|
|
category = error.get('category', '')
|
|
|
|
|
|
severity = error.get('severity', 'WARNING')
|
|
|
|
|
|
reason = error.get('reason', '')
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
# Determine if we should notify
|
|
|
|
|
|
is_new = error_key not in self._known_errors and self._first_poll_done
|
|
|
|
|
|
last_sent = self._last_notified.get(error_key, 0)
|
|
|
|
|
|
is_due = (now - last_sent) >= self.DIGEST_INTERVAL
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
if not is_new and not is_due:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# Map to our event type
|
|
|
|
|
|
event_type = self._CATEGORY_TO_EVENT_TYPE.get(category, 'system_problem')
|
|
|
|
|
|
entity, eid = self._ENTITY_MAP.get(category, ('node', ''))
|
|
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'hostname': self._hostname,
|
|
|
|
|
|
'category': category,
|
|
|
|
|
|
'reason': reason,
|
|
|
|
|
|
'error_key': error_key,
|
|
|
|
|
|
'severity': severity,
|
|
|
|
|
|
'first_seen': error.get('first_seen', ''),
|
|
|
|
|
|
'last_seen': error.get('last_seen', ''),
|
|
|
|
|
|
'is_persistent': not is_new,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Include extra details if present
|
|
|
|
|
|
details = error.get('details')
|
|
|
|
|
|
if isinstance(details, dict):
|
|
|
|
|
|
data.update(details)
|
|
|
|
|
|
elif isinstance(details, str):
|
|
|
|
|
|
try:
|
|
|
|
|
|
data.update(json.loads(details))
|
|
|
|
|
|
except (json.JSONDecodeError, TypeError):
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
self._queue.put(NotificationEvent(
|
|
|
|
|
|
event_type, severity, data, source='health',
|
|
|
|
|
|
entity=entity, entity_id=eid or error_key,
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
# Track that we notified
|
|
|
|
|
|
self._last_notified[error_key] = now
|
|
|
|
|
|
self._persist_last_notified(error_key, now)
|
|
|
|
|
|
|
|
|
|
|
|
# Remove tracking for errors that resolved
|
|
|
|
|
|
resolved = self._known_errors - current_keys
|
|
|
|
|
|
for key in resolved:
|
|
|
|
|
|
self._last_notified.pop(key, None)
|
|
|
|
|
|
|
|
|
|
|
|
self._known_errors = current_keys
|
|
|
|
|
|
self._first_poll_done = True
|
|
|
|
|
|
|
|
|
|
|
|
# ── Update check (enriched) ────────────────────────────────
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
|
|
|
|
|
def _check_updates(self):
|
2026-02-21 17:23:03 +01:00
|
|
|
|
"""Check for available system updates every 24 h.
|
|
|
|
|
|
|
|
|
|
|
|
Enriched output: total count, security updates, PVE version hint,
|
|
|
|
|
|
and top package names.
|
|
|
|
|
|
"""
|
2026-02-18 17:24:26 +01:00
|
|
|
|
now = time.time()
|
2026-02-21 17:23:03 +01:00
|
|
|
|
if now - self._last_update_check < self.UPDATE_CHECK_INTERVAL:
|
2026-02-18 17:24:26 +01:00
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
self._last_update_check = now
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = subprocess.run(
|
|
|
|
|
|
['apt-get', '-s', 'upgrade'],
|
2026-02-21 17:23:03 +01:00
|
|
|
|
capture_output=True, text=True, timeout=60,
|
2026-02-18 17:24:26 +01:00
|
|
|
|
)
|
2026-02-21 17:23:03 +01:00
|
|
|
|
if result.returncode != 0:
|
|
|
|
|
|
return
|
2026-02-18 17:24:26 +01:00
|
|
|
|
|
2026-02-21 17:23:03 +01:00
|
|
|
|
lines = [l for l in result.stdout.split('\n') if l.startswith('Inst ')]
|
|
|
|
|
|
total = len(lines)
|
|
|
|
|
|
if total == 0:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
packages = [l.split()[1] for l in lines]
|
|
|
|
|
|
security = [p for p in packages if any(
|
|
|
|
|
|
kw in p.lower() for kw in ('security', 'cve', 'openssl', 'libssl')
|
|
|
|
|
|
)]
|
|
|
|
|
|
|
|
|
|
|
|
# Also detect security updates via apt changelog / Debian-Security origin
|
|
|
|
|
|
sec_result = subprocess.run(
|
|
|
|
|
|
['apt-get', '-s', 'upgrade', '-o', 'Dir::Etc::SourceList=/dev/null',
|
|
|
|
|
|
'-o', 'Dir::Etc::SourceParts=/dev/null'],
|
|
|
|
|
|
capture_output=True, text=True, timeout=30,
|
|
|
|
|
|
)
|
|
|
|
|
|
# Count lines from security repo (rough heuristic)
|
|
|
|
|
|
sec_count = max(len(security), 0)
|
|
|
|
|
|
try:
|
|
|
|
|
|
sec_output = subprocess.run(
|
|
|
|
|
|
['apt-get', '-s', '--only-upgrade', 'install'] + packages[:50],
|
|
|
|
|
|
capture_output=True, text=True, timeout=30,
|
|
|
|
|
|
)
|
|
|
|
|
|
for line in sec_output.stdout.split('\n'):
|
|
|
|
|
|
if 'security' in line.lower() and 'Inst ' in line:
|
|
|
|
|
|
sec_count += 1
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# Check for PVE version upgrade
|
|
|
|
|
|
pve_packages = [p for p in packages if 'pve-' in p.lower() or 'proxmox-' in p.lower()]
|
|
|
|
|
|
|
|
|
|
|
|
# Build display details
|
|
|
|
|
|
top_pkgs = packages[:8]
|
|
|
|
|
|
details = ', '.join(top_pkgs)
|
|
|
|
|
|
if total > 8:
|
|
|
|
|
|
details += f', ... +{total - 8} more'
|
|
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'hostname': self._hostname,
|
|
|
|
|
|
'count': str(total),
|
|
|
|
|
|
'security_count': str(sec_count),
|
|
|
|
|
|
'details': details,
|
|
|
|
|
|
'packages': ', '.join(packages[:20]),
|
|
|
|
|
|
}
|
|
|
|
|
|
if pve_packages:
|
|
|
|
|
|
data['pve_packages'] = ', '.join(pve_packages)
|
|
|
|
|
|
|
|
|
|
|
|
self._queue.put(NotificationEvent(
|
|
|
|
|
|
'update_available', 'INFO', data,
|
|
|
|
|
|
source='polling', entity='node', entity_id='',
|
|
|
|
|
|
))
|
2026-02-18 17:24:26 +01:00
|
|
|
|
except Exception:
|
2026-02-21 17:23:03 +01:00
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# ── Persistence helpers ────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
def _load_last_notified(self):
|
|
|
|
|
|
"""Load per-error notification timestamps from DB on startup."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
db_path = Path('/usr/local/share/proxmenux/health_monitor.db')
|
|
|
|
|
|
if not db_path.exists():
|
|
|
|
|
|
return
|
|
|
|
|
|
conn = sqlite3.connect(str(db_path), timeout=10)
|
|
|
|
|
|
conn.execute('PRAGMA journal_mode=WAL')
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(
|
|
|
|
|
|
"SELECT fingerprint, last_sent_ts FROM notification_last_sent "
|
|
|
|
|
|
"WHERE fingerprint LIKE 'health_%'"
|
|
|
|
|
|
)
|
|
|
|
|
|
for fp, ts in cursor.fetchall():
|
|
|
|
|
|
error_key = fp.replace('health_', '', 1)
|
|
|
|
|
|
self._last_notified[error_key] = ts
|
|
|
|
|
|
self._known_errors.add(error_key)
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PollingCollector] Failed to load last_notified: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _persist_last_notified(self, error_key: str, ts: float):
|
|
|
|
|
|
"""Save per-error notification timestamp to DB."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
db_path = Path('/usr/local/share/proxmenux/health_monitor.db')
|
|
|
|
|
|
conn = sqlite3.connect(str(db_path), timeout=10)
|
|
|
|
|
|
conn.execute('PRAGMA journal_mode=WAL')
|
|
|
|
|
|
conn.execute('PRAGMA busy_timeout=5000')
|
|
|
|
|
|
fp = f'health_{error_key}'
|
|
|
|
|
|
conn.execute('''
|
|
|
|
|
|
INSERT OR REPLACE INTO notification_last_sent (fingerprint, last_sent_ts, count)
|
|
|
|
|
|
VALUES (?, ?, COALESCE(
|
|
|
|
|
|
(SELECT count + 1 FROM notification_last_sent WHERE fingerprint = ?), 1
|
|
|
|
|
|
))
|
|
|
|
|
|
''', (fp, int(ts), fp))
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
2026-02-19 17:02:02 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ─── Proxmox Webhook Receiver ───────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
class ProxmoxHookWatcher:
|
|
|
|
|
|
"""Receives native Proxmox VE notifications via local webhook endpoint.
|
|
|
|
|
|
|
2026-02-19 17:26:36 +01:00
|
|
|
|
Configured automatically via /etc/pve/notifications.cfg (endpoint +
|
|
|
|
|
|
matcher blocks). The setup-webhook API writes these blocks on first
|
|
|
|
|
|
enable. See flask_notification_routes.py for details.
|
2026-02-19 17:02:02 +01:00
|
|
|
|
|
|
|
|
|
|
Payload varies by source (storage, replication, cluster, PBS, apt).
|
|
|
|
|
|
This class normalizes them into NotificationEvent objects.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, event_queue: Queue):
|
|
|
|
|
|
self._queue = event_queue
|
|
|
|
|
|
self._hostname = _hostname()
|
|
|
|
|
|
|
|
|
|
|
|
def process_webhook(self, payload: dict) -> dict:
|
|
|
|
|
|
"""Process an incoming Proxmox webhook payload.
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
The PVE webhook is the PRIMARY source for vzdump, replication,
|
|
|
|
|
|
fencing, package-updates and system-mail events. PVE sends rich
|
|
|
|
|
|
detail (full logs, sizes, durations) that TaskWatcher cannot match.
|
|
|
|
|
|
|
|
|
|
|
|
Body template delivers:
|
|
|
|
|
|
{title, message, severity, timestamp, fields: {type, hostname, job-id}}
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
Returns: {'accepted': bool, 'event_type': str, 'event_id': str}
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not payload:
|
|
|
|
|
|
return {'accepted': False, 'error': 'Empty payload'}
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# ── Extract structured PVE fields ──
|
|
|
|
|
|
fields = payload.get('fields') or {}
|
|
|
|
|
|
if isinstance(fields, str):
|
|
|
|
|
|
# Edge case: {{ json fields }} rendered as string instead of dict
|
|
|
|
|
|
try:
|
|
|
|
|
|
import json
|
|
|
|
|
|
fields = json.loads(fields)
|
|
|
|
|
|
except (json.JSONDecodeError, ValueError):
|
|
|
|
|
|
fields = {}
|
|
|
|
|
|
|
|
|
|
|
|
pve_type = fields.get('type', '').lower().strip()
|
|
|
|
|
|
pve_hostname = fields.get('hostname', self._hostname)
|
|
|
|
|
|
pve_job_id = fields.get('job-id', '')
|
|
|
|
|
|
|
|
|
|
|
|
title = payload.get('title', '')
|
|
|
|
|
|
message = payload.get('message', payload.get('body', ''))
|
|
|
|
|
|
severity_raw = payload.get('severity', 'info').lower().strip()
|
|
|
|
|
|
timestamp = payload.get('timestamp', '')
|
|
|
|
|
|
|
|
|
|
|
|
# ── Classify by PVE type (direct, no heuristics needed) ──
|
|
|
|
|
|
import re
|
|
|
|
|
|
event_type, entity, entity_id = self._classify_pve(
|
|
|
|
|
|
pve_type, severity_raw, title, message
|
|
|
|
|
|
)
|
2026-02-21 17:23:03 +01:00
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# Discard meta-events
|
2026-02-21 17:23:03 +01:00
|
|
|
|
if event_type == '_skip':
|
|
|
|
|
|
return {'accepted': False, 'skipped': True, 'reason': 'Meta-event filtered'}
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
severity = self._map_severity(severity_raw)
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# ── Build rich data dict ──
|
|
|
|
|
|
# For webhook events, PVE's `message` IS the notification body.
|
|
|
|
|
|
# It contains full vzdump logs, package lists, error details, etc.
|
|
|
|
|
|
# We pass it as 'pve_message' so templates can use it directly.
|
2026-02-19 17:02:02 +01:00
|
|
|
|
data = {
|
2026-02-24 17:55:03 +01:00
|
|
|
|
'hostname': pve_hostname,
|
|
|
|
|
|
'pve_type': pve_type,
|
|
|
|
|
|
'pve_message': message,
|
2026-02-24 18:20:43 +01:00
|
|
|
|
'pve_title': title,
|
2026-02-19 17:02:02 +01:00
|
|
|
|
'title': title,
|
2026-02-24 17:55:03 +01:00
|
|
|
|
'job_id': pve_job_id,
|
2026-02-21 22:36:58 +01:00
|
|
|
|
}
|
2026-02-24 17:55:03 +01:00
|
|
|
|
|
|
|
|
|
|
# Extract VMID and VM name from message for vzdump events
|
|
|
|
|
|
if pve_type == 'vzdump' and message:
|
|
|
|
|
|
# PVE vzdump messages contain lines like:
|
|
|
|
|
|
# "INFO: Starting Backup of VM 100 (qemu)"
|
|
|
|
|
|
# "VMID Name Status Time Size Filename"
|
|
|
|
|
|
# "100 arch-linux OK 00:05:30 1.2G /path/to/file"
|
|
|
|
|
|
vmids = re.findall(r'(?:VM|CT)\s+(\d+)', message, re.IGNORECASE)
|
|
|
|
|
|
if vmids:
|
|
|
|
|
|
data['vmid'] = vmids[0]
|
|
|
|
|
|
entity_id = vmids[0]
|
|
|
|
|
|
# Try to extract VM name from the table line
|
|
|
|
|
|
name_m = re.search(r'(\d+)\s+(\S+)\s+(?:OK|ERROR|WARNINGS)', message)
|
|
|
|
|
|
if name_m:
|
|
|
|
|
|
data['vmname'] = name_m.group(2)
|
|
|
|
|
|
# Extract size from "Total size: X"
|
|
|
|
|
|
size_m = re.search(r'Total size:\s*(.+?)(?:\n|$)', message)
|
|
|
|
|
|
if size_m:
|
|
|
|
|
|
data['size'] = size_m.group(1).strip()
|
|
|
|
|
|
# Extract duration from "Total running time: X"
|
|
|
|
|
|
dur_m = re.search(r'Total running time:\s*(.+?)(?:\n|$)', message)
|
|
|
|
|
|
if dur_m:
|
|
|
|
|
|
data['duration'] = dur_m.group(1).strip()
|
|
|
|
|
|
|
2026-02-24 18:10:12 +01:00
|
|
|
|
# Record this event for cross-source dedup.
|
2026-02-24 19:27:43 +01:00
|
|
|
|
# TaskWatcher iterates this dict checking if any key with the same
|
|
|
|
|
|
# event_type prefix was delivered recently (within 120s).
|
2026-02-24 18:10:12 +01:00
|
|
|
|
import time
|
2026-02-24 19:27:43 +01:00
|
|
|
|
self._delivered[f"{event_type}:{entity_id}"] = time.time()
|
|
|
|
|
|
# Cleanup old entries (use del, NOT reassign -- TaskWatcher holds a ref)
|
2026-02-24 17:55:03 +01:00
|
|
|
|
if len(self._delivered) > 200:
|
|
|
|
|
|
cutoff = time.time() - 300
|
2026-02-24 19:27:43 +01:00
|
|
|
|
stale = [k for k, v in self._delivered.items() if v < cutoff]
|
|
|
|
|
|
for k in stale:
|
|
|
|
|
|
del self._delivered[k]
|
2026-02-21 22:36:58 +01:00
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
event = NotificationEvent(
|
|
|
|
|
|
event_type=event_type,
|
|
|
|
|
|
severity=severity,
|
|
|
|
|
|
data=data,
|
|
|
|
|
|
source='proxmox_hook',
|
|
|
|
|
|
entity=entity,
|
|
|
|
|
|
entity_id=entity_id,
|
|
|
|
|
|
raw=payload,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
self._queue.put(event)
|
|
|
|
|
|
return {'accepted': True, 'event_type': event_type, 'event_id': event.event_id}
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
def _classify_pve(self, pve_type: str, severity: str,
|
|
|
|
|
|
title: str, message: str) -> tuple:
|
|
|
|
|
|
"""Classify using PVE's structured fields.type.
|
2026-02-21 17:23:03 +01:00
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
Returns (event_type, entity, entity_id).
|
2026-02-21 17:23:03 +01:00
|
|
|
|
"""
|
2026-02-19 17:02:02 +01:00
|
|
|
|
title_lower = (title or '').lower()
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# Skip overall/updates status change meta-events
|
2026-02-21 17:23:03 +01:00
|
|
|
|
if 'overall' in title_lower and ('changed' in title_lower or 'status' in title_lower):
|
|
|
|
|
|
return '_skip', '', ''
|
|
|
|
|
|
if 'updates' in title_lower and ('changed' in title_lower or 'status' in title_lower):
|
|
|
|
|
|
return '_skip', '', ''
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# ── Direct classification by PVE type ──
|
2026-02-21 19:56:50 +01:00
|
|
|
|
if pve_type == 'vzdump':
|
2026-02-24 17:55:03 +01:00
|
|
|
|
if severity in ('error', 'err'):
|
|
|
|
|
|
return 'backup_fail', 'vm', ''
|
|
|
|
|
|
return 'backup_complete', 'vm', ''
|
2026-02-21 19:56:50 +01:00
|
|
|
|
|
|
|
|
|
|
if pve_type == 'fencing':
|
2026-02-24 17:55:03 +01:00
|
|
|
|
return 'split_brain', 'node', ''
|
2026-02-21 19:56:50 +01:00
|
|
|
|
|
|
|
|
|
|
if pve_type == 'replication':
|
|
|
|
|
|
return 'replication_fail', 'vm', ''
|
|
|
|
|
|
|
|
|
|
|
|
if pve_type == 'package-updates':
|
|
|
|
|
|
return 'update_available', 'node', ''
|
|
|
|
|
|
|
|
|
|
|
|
if pve_type == 'system-mail':
|
2026-02-24 17:55:03 +01:00
|
|
|
|
return 'system_mail', 'node', ''
|
2026-02-19 17:02:02 +01:00
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# ── Fallback for unknown/empty pve_type ──
|
|
|
|
|
|
# (e.g. test notifications, future PVE event types)
|
|
|
|
|
|
msg_lower = (message or '').lower()
|
|
|
|
|
|
text = f"{title_lower} {msg_lower}"
|
2026-02-21 22:19:45 +01:00
|
|
|
|
|
|
|
|
|
|
if 'vzdump' in text or 'backup' in text:
|
|
|
|
|
|
import re
|
2026-02-24 17:55:03 +01:00
|
|
|
|
m = re.search(r'(?:vm|ct)\s+(\d+)', text, re.IGNORECASE)
|
2026-02-21 22:19:45 +01:00
|
|
|
|
vmid = m.group(1) if m else ''
|
2026-02-24 17:55:03 +01:00
|
|
|
|
if any(w in text for w in ('fail', 'error')):
|
2026-02-21 22:19:45 +01:00
|
|
|
|
return 'backup_fail', 'vm', vmid
|
2026-02-24 17:55:03 +01:00
|
|
|
|
return 'backup_complete', 'vm', vmid
|
2026-02-21 22:19:45 +01:00
|
|
|
|
|
|
|
|
|
|
if 'replication' in text:
|
2026-02-24 17:55:03 +01:00
|
|
|
|
return 'replication_fail', 'vm', ''
|
2026-02-21 22:19:45 +01:00
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# Generic fallback
|
2026-02-19 17:02:02 +01:00
|
|
|
|
return 'system_problem', 'node', ''
|
|
|
|
|
|
|
2026-02-24 17:55:03 +01:00
|
|
|
|
# Old _classify removed -- replaced by _classify_pve above.
|
|
|
|
|
|
|
2026-02-19 17:02:02 +01:00
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _map_severity(raw: str) -> str:
|
|
|
|
|
|
raw_l = str(raw).lower()
|
|
|
|
|
|
if raw_l in ('critical', 'emergency', 'alert', 'crit', 'err', 'error'):
|
|
|
|
|
|
return 'CRITICAL'
|
2026-02-21 21:36:27 +01:00
|
|
|
|
if raw_l in ('warning', 'warn', 'notice'):
|
2026-02-19 17:02:02 +01:00
|
|
|
|
return 'WARNING'
|
|
|
|
|
|
return 'INFO'
|