Update notification service

This commit is contained in:
MacRimi
2026-02-21 22:36:58 +01:00
parent e3f7e8c97a
commit 507f769357
2 changed files with 16 additions and 40 deletions

View File

@@ -1015,6 +1015,22 @@ class ProxmoxHookWatcher:
if key not in _reserved and key not in data:
data[key] = str(val) if val is not None else ''
# Skip event types that the TaskWatcher already captures with
# full details (VMID, VM name, size, duration, etc.).
# The PVE webhook sends inferior data for these (no VMID, no name).
# Only forward events that tasks/journal do NOT catch reliably.
_TASKS_HANDLED = {
'backup_complete', 'backup_fail', 'backup_start',
'snapshot_complete', 'snapshot_fail',
'vm_start', 'vm_stop', 'vm_restart', 'vm_shutdown',
'ct_start', 'ct_stop',
'migration_start', 'migration_complete', 'migration_fail',
'replication_complete', 'replication_fail',
}
if event_type in _TASKS_HANDLED:
return {'accepted': False, 'skipped': True,
'reason': f'tasks_watcher_handles_{event_type}'}
event = NotificationEvent(
event_type=event_type,
severity=severity,
@@ -1025,31 +1041,6 @@ class ProxmoxHookWatcher:
raw=payload,
)
# Cross-source dedup: if a tasks/journal watcher already emitted
# the same event_type+entity_id within 60s, skip the webhook copy.
# The fingerprint is hostname:entity:entity_id:event_type (source-agnostic).
import time
dedup_key = f"{self._hostname}:{entity}:{entity_id}:{event_type}"
now = time.time()
if not hasattr(self, '_webhook_dedup'):
self._webhook_dedup = {}
last_seen = self._webhook_dedup.get(dedup_key, 0)
if now - last_seen < 60:
return {'accepted': False, 'skipped': True, 'reason': 'duplicate_within_60s'}
self._webhook_dedup[dedup_key] = now
# Also check the pipeline's global dedup (from other sources)
if hasattr(self, '_pipeline') and hasattr(self._pipeline, '_recent_fingerprints'):
if dedup_key in self._pipeline._recent_fingerprints:
fp_time = self._pipeline._recent_fingerprints[dedup_key]
if now - fp_time < 60:
return {'accepted': False, 'skipped': True, 'reason': 'duplicate_cross_source'}
# Cleanup old entries periodically
if len(self._webhook_dedup) > 200:
cutoff = now - 120
self._webhook_dedup = {k: v for k, v in self._webhook_dedup.items() if v > cutoff}
self._queue.put(event)
return {'accepted': True, 'event_type': event_type, 'event_id': event.event_id}