Update notification service

This commit is contained in:
MacRimi
2026-02-19 17:02:02 +01:00
parent 34d04e57dd
commit 7c5cdb9161
7 changed files with 1587 additions and 95 deletions

View File

@@ -39,7 +39,8 @@ from notification_templates import (
EVENT_GROUPS, get_event_types_by_group, get_default_enabled_events
)
from notification_events import (
JournalWatcher, TaskWatcher, PollingCollector, NotificationEvent
JournalWatcher, TaskWatcher, PollingCollector, NotificationEvent,
ProxmoxHookWatcher,
)
@@ -50,7 +51,7 @@ SETTINGS_PREFIX = 'notification.'
# Cooldown defaults (seconds)
DEFAULT_COOLDOWNS = {
'CRITICAL': 0, # No cooldown for critical
'CRITICAL': 60, # 60s minimum (prevents storm, delivers fast)
'WARNING': 300, # 5 min
'INFO': 900, # 15 min
'resources': 900, # 15 min for resource alerts
@@ -58,6 +59,191 @@ DEFAULT_COOLDOWNS = {
}
# ─── Storm Protection ────────────────────────────────────────────
GROUP_RATE_LIMITS = {
'security': {'max_per_minute': 5, 'max_per_hour': 30},
'storage': {'max_per_minute': 3, 'max_per_hour': 20},
'cluster': {'max_per_minute': 5, 'max_per_hour': 20},
'network': {'max_per_minute': 3, 'max_per_hour': 15},
'resources': {'max_per_minute': 3, 'max_per_hour': 20},
'vm_ct': {'max_per_minute': 10, 'max_per_hour': 60},
'backup': {'max_per_minute': 5, 'max_per_hour': 30},
'system': {'max_per_minute': 5, 'max_per_hour': 30},
}
class GroupRateLimiter:
"""Rate limiter per event group. Prevents notification storms."""
def __init__(self):
from collections import deque
self._deque = deque
self._minute_counts: Dict[str, Any] = {} # group -> deque[timestamp]
self._hour_counts: Dict[str, Any] = {} # group -> deque[timestamp]
def allow(self, group: str) -> bool:
"""Check if group rate limit allows this event."""
limits = GROUP_RATE_LIMITS.get(group, GROUP_RATE_LIMITS['system'])
now = time.time()
# Initialize if needed
if group not in self._minute_counts:
self._minute_counts[group] = self._deque()
self._hour_counts[group] = self._deque()
# Prune old entries
minute_q = self._minute_counts[group]
hour_q = self._hour_counts[group]
while minute_q and now - minute_q[0] > 60:
minute_q.popleft()
while hour_q and now - hour_q[0] > 3600:
hour_q.popleft()
# Check limits
if len(minute_q) >= limits['max_per_minute']:
return False
if len(hour_q) >= limits['max_per_hour']:
return False
# Record
minute_q.append(now)
hour_q.append(now)
return True
def get_stats(self) -> Dict[str, Dict[str, int]]:
"""Return current rate stats per group."""
now = time.time()
stats = {}
for group in self._minute_counts:
minute_q = self._minute_counts.get(group, [])
hour_q = self._hour_counts.get(group, [])
stats[group] = {
'last_minute': sum(1 for t in minute_q if now - t <= 60),
'last_hour': sum(1 for t in hour_q if now - t <= 3600),
}
return stats
AGGREGATION_RULES = {
'auth_fail': {'window': 120, 'min_count': 3, 'burst_type': 'burst_auth_fail'},
'ip_block': {'window': 120, 'min_count': 3, 'burst_type': 'burst_ip_block'},
'disk_io_error': {'window': 60, 'min_count': 3, 'burst_type': 'burst_disk_io'},
'split_brain': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
'node_disconnect': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
}
class BurstAggregator:
"""Accumulates similar events in a time window, then sends a single summary.
Examples:
- "Fail2Ban banned 17 IPs in 2 minutes"
- "Disk I/O errors: 34 events on /dev/sdb in 60s"
"""
def __init__(self):
self._buckets: Dict[str, List] = {} # bucket_key -> [events]
self._deadlines: Dict[str, float] = {} # bucket_key -> flush_deadline
self._lock = threading.Lock()
def ingest(self, event: NotificationEvent) -> Optional[NotificationEvent]:
"""Add event to aggregation. Returns:
- None if event is being buffered (wait for window)
- Original event if not eligible for aggregation
"""
rule = AGGREGATION_RULES.get(event.event_type)
if not rule:
return event # Not aggregable, pass through
bucket_key = f"{event.event_type}:{event.data.get('hostname', '')}"
with self._lock:
if bucket_key not in self._buckets:
self._buckets[bucket_key] = []
self._deadlines[bucket_key] = time.time() + rule['window']
self._buckets[bucket_key].append(event)
# First event in bucket: pass through immediately so user gets fast alert
if len(self._buckets[bucket_key]) == 1:
return event
# Subsequent events: buffer (will be flushed as summary)
return None
def flush_expired(self) -> List[NotificationEvent]:
"""Flush all buckets past their deadline. Returns summary events."""
now = time.time()
summaries = []
with self._lock:
expired_keys = [k for k, d in self._deadlines.items() if now >= d]
for key in expired_keys:
events = self._buckets.pop(key, [])
del self._deadlines[key]
if len(events) < 2:
continue # Single event already sent on ingest, no summary needed
rule_type = key.split(':')[0]
rule = AGGREGATION_RULES.get(rule_type, {})
min_count = rule.get('min_count', 2)
if len(events) < min_count:
continue # Not enough events for a summary
summary = self._create_summary(events, rule)
if summary:
summaries.append(summary)
return summaries
def _create_summary(self, events: List[NotificationEvent],
rule: dict) -> Optional[NotificationEvent]:
"""Create a single summary event from multiple events."""
if not events:
return None
first = events[0]
# Determine highest severity
sev_order = {'INFO': 0, 'WARNING': 1, 'CRITICAL': 2}
max_severity = max(events, key=lambda e: sev_order.get(e.severity, 0)).severity
# Collect unique entity_ids
entity_ids = list(set(e.entity_id for e in events if e.entity_id))
entity_list = ', '.join(entity_ids[:10]) if entity_ids else 'multiple sources'
if len(entity_ids) > 10:
entity_list += f' (+{len(entity_ids) - 10} more)'
# Calculate window
window_secs = events[-1].ts_epoch - events[0].ts_epoch
if window_secs < 120:
window_str = f'{int(window_secs)}s'
else:
window_str = f'{int(window_secs / 60)}m'
burst_type = rule.get('burst_type', 'burst_generic')
data = {
'hostname': first.data.get('hostname', socket.gethostname()),
'count': str(len(events)),
'window': window_str,
'entity_list': entity_list,
'event_type': first.event_type,
}
return NotificationEvent(
event_type=burst_type,
severity=max_severity,
data=data,
source='aggregator',
entity=first.entity,
entity_id='burst',
)
# ─── Notification Manager ─────────────────────────────────────────
class NotificationManager:
@@ -81,9 +267,17 @@ class NotificationManager:
self._polling_collector: Optional[PollingCollector] = None
self._dispatch_thread: Optional[threading.Thread] = None
# Cooldown tracking: {event_type_or_key: last_sent_timestamp}
# Webhook receiver (no thread, passive)
self._hook_watcher: Optional[ProxmoxHookWatcher] = None
# Cooldown tracking: {fingerprint: last_sent_timestamp}
self._cooldowns: Dict[str, float] = {}
# Storm protection
self._group_limiter = GroupRateLimiter()
self._aggregator = BurstAggregator()
self._aggregation_thread: Optional[threading.Thread] = None
# Stats
self._stats = {
'started_at': None,
@@ -180,6 +374,7 @@ class NotificationManager:
return
self._load_config()
self._load_cooldowns_from_db()
if not self._enabled:
print("[NotificationManager] Service is disabled. Skipping start.")
@@ -220,19 +415,48 @@ class NotificationManager:
def _dispatch_loop(self):
"""Main dispatch loop: reads queue -> filters -> formats -> sends -> records."""
last_cleanup = time.monotonic()
last_flush = time.monotonic()
cleanup_interval = 3600 # Cleanup cooldowns every hour
flush_interval = 5 # Flush aggregation buckets every 5s
while self._running:
try:
event = self._event_queue.get(timeout=2)
except Empty:
# Periodic maintenance during idle
now_mono = time.monotonic()
if now_mono - last_cleanup > cleanup_interval:
self._cleanup_old_cooldowns()
last_cleanup = now_mono
# Flush expired aggregation buckets
if now_mono - last_flush > flush_interval:
self._flush_aggregation()
last_flush = now_mono
continue
try:
self._process_event(event)
except Exception as e:
print(f"[NotificationManager] Dispatch error: {e}")
# Also flush aggregation after each event
if time.monotonic() - last_flush > flush_interval:
self._flush_aggregation()
last_flush = time.monotonic()
def _flush_aggregation(self):
"""Flush expired aggregation buckets and dispatch summaries."""
try:
summaries = self._aggregator.flush_expired()
for summary_event in summaries:
# Burst summaries bypass aggregator but still pass cooldown + rate limit
self._process_event_direct(summary_event)
except Exception as e:
print(f"[NotificationManager] Aggregation flush error: {e}")
def _process_event(self, event: NotificationEvent):
"""Process a single event from the queue."""
"""Process a single event: filter -> aggregate -> cooldown -> rate limit -> dispatch."""
if not self._enabled:
return
@@ -246,14 +470,43 @@ class NotificationManager:
if not self._meets_severity(event.severity, min_severity):
return
# Try aggregation (may buffer the event)
result = self._aggregator.ingest(event)
if result is None:
return # Buffered, will be flushed as summary later
event = result # Use original event (first in burst passes through)
# From here, proceed with dispatch (shared with _process_event_direct)
self._dispatch_event(event)
def _process_event_direct(self, event: NotificationEvent):
"""Process a burst summary event. Bypasses aggregator but applies all other filters."""
if not self._enabled:
return
# Check severity filter
min_severity = self._config.get('filter.min_severity', 'INFO')
if not self._meets_severity(event.severity, min_severity):
return
self._dispatch_event(event)
def _dispatch_event(self, event: NotificationEvent):
"""Shared dispatch pipeline: cooldown -> rate limit -> render -> send."""
# Check cooldown
if not self._check_cooldown(event):
return
# Render message from template
# Check group rate limit
template = TEMPLATES.get(event.event_type, {})
group = template.get('group', 'system')
if not self._group_limiter.allow(group):
return
# Render message from template (structured output)
rendered = render_template(event.event_type, event.data)
# Optional AI enhancement
# Optional AI enhancement (on text body only)
ai_config = {
'enabled': self._config.get('ai_enabled', 'false'),
'provider': self._config.get('ai_provider', ''),
@@ -264,10 +517,15 @@ class NotificationManager:
rendered['title'], rendered['body'], rendered['severity'], ai_config
)
# Enrich data with structured fields for channels that support them
enriched_data = dict(event.data)
enriched_data['_rendered_fields'] = rendered.get('fields', [])
enriched_data['_body_html'] = rendered.get('body_html', '')
# Send through all active channels
self._dispatch_to_channels(
rendered['title'], body, rendered['severity'],
event.event_type, event.data, event.source
event.event_type, enriched_data, event.source
)
def _dispatch_to_channels(self, title: str, body: str, severity: str,
@@ -323,20 +581,67 @@ class NotificationManager:
else:
cooldown = DEFAULT_COOLDOWNS.get(event.severity, 300)
# CRITICAL events have zero cooldown by default
# CRITICAL events: 60s minimum cooldown (prevents storm, but delivers fast)
if event.severity == 'CRITICAL' and cooldown_str is None:
cooldown = 0
cooldown = 60
# Check against last sent time
dedup_key = f"{event.event_type}:{event.data.get('category', '')}:{event.data.get('vmid', '')}"
last_sent = self._cooldowns.get(dedup_key, 0)
# Check against last sent time using stable fingerprint
last_sent = self._cooldowns.get(event.fingerprint, 0)
if now - last_sent < cooldown:
return False
self._cooldowns[dedup_key] = now
self._cooldowns[event.fingerprint] = now
self._persist_cooldown(event.fingerprint, now)
return True
def _load_cooldowns_from_db(self):
"""Load persistent cooldown state from SQLite (up to 48h)."""
try:
if not DB_PATH.exists():
return
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute('PRAGMA journal_mode=WAL')
cursor = conn.cursor()
cursor.execute('SELECT fingerprint, last_sent_ts FROM notification_last_sent')
now = time.time()
for fp, ts in cursor.fetchall():
if now - ts < 172800: # 48h window
self._cooldowns[fp] = ts
conn.close()
except Exception as e:
print(f"[NotificationManager] Failed to load cooldowns: {e}")
def _persist_cooldown(self, fingerprint: str, ts: float):
"""Save cooldown timestamp to SQLite for restart persistence."""
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA busy_timeout=5000')
conn.execute('''
INSERT OR REPLACE INTO notification_last_sent (fingerprint, last_sent_ts, count)
VALUES (?, ?, COALESCE(
(SELECT count + 1 FROM notification_last_sent WHERE fingerprint = ?), 1
))
''', (fingerprint, int(ts), fingerprint))
conn.commit()
conn.close()
except Exception:
pass # Non-critical, in-memory cooldown still works
def _cleanup_old_cooldowns(self):
"""Remove cooldown entries older than 48h from both memory and DB."""
cutoff = time.time() - 172800 # 48h
self._cooldowns = {k: v for k, v in self._cooldowns.items() if v > cutoff}
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('DELETE FROM notification_last_sent WHERE last_sent_ts < ?', (int(cutoff),))
conn.commit()
conn.close()
except Exception:
pass
@staticmethod
def _meets_severity(event_severity: str, min_severity: str) -> bool:
"""Check if event severity meets the minimum threshold."""
@@ -487,6 +792,31 @@ class NotificationManager:
'results': results,
}
# ─── Proxmox Webhook ──────────────────────────────────────────
def process_webhook(self, payload: dict) -> dict:
"""Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher."""
if not self._hook_watcher:
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
return self._hook_watcher.process_webhook(payload)
def get_webhook_secret(self) -> str:
"""Get configured webhook secret, or empty string if none."""
if not self._config:
self._load_config()
return self._config.get('webhook_secret', '')
def get_webhook_allowed_ips(self) -> list:
"""Get list of allowed IPs for webhook, or empty list (allow all)."""
if not self._config:
self._load_config()
raw = self._config.get('webhook_allowed_ips', '')
if not raw:
return []
return [ip.strip() for ip in str(raw).split(',') if ip.strip()]
# ─── Status & Settings ──────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Get current service status."""
if not self._config:
@@ -618,6 +948,8 @@ class NotificationManager:
'event_groups': EVENT_GROUPS,
'event_types': get_event_types_by_group(),
'default_events': get_default_enabled_events(),
'webhook_secret': self._config.get('webhook_secret', ''),
'webhook_allowed_ips': self._config.get('webhook_allowed_ips', ''),
}
def save_settings(self, settings: Dict[str, str]) -> Dict[str, Any]: