Update notification service

This commit is contained in:
MacRimi
2026-02-28 19:18:13 +01:00
parent 5e9ef37646
commit c5354d014c
7 changed files with 598 additions and 38 deletions

View File

@@ -302,6 +302,10 @@ class JournalWatcher:
lib_match = re.search(r'\bin\s+(\S+)', msg)
lib_name = lib_match.group(1) if lib_match else ''
# Dedup by process name so repeated segfaults don't spam
if proc_name:
entity_id = f'segfault_{proc_name}'
parts = [reason]
if proc_name:
parts.append(f"Process: {proc_name}" + (f" (PID {proc_pid})" if proc_pid else ''))
@@ -313,9 +317,48 @@ class JournalWatcher:
m = re.search(r'Killed process\s+(\d+)\s+\(([^)]+)\)', msg)
if m:
enriched = f"{reason}\nKilled: {m.group(2)} (PID {m.group(1)})"
entity_id = f'oom_{m.group(2)}' # Dedup by killed process
else:
enriched = f"{reason}\n{msg[:300]}"
elif re.search(r'EXT4-fs error|BTRFS error|XFS.*error|ZFS.*error', msg, re.IGNORECASE):
# Filesystem errors: extract device, function and human-readable explanation
fs_type = 'EXT4'
for fs in ['EXT4', 'BTRFS', 'XFS', 'ZFS']:
if fs.lower() in msg.lower():
fs_type = fs
break
dev_match = re.search(r'device\s+(\S+?)\)?:', msg)
device = dev_match.group(1).rstrip(')') if dev_match else 'unknown'
# Dedup by device: all EXT4 errors on sdb1 share ONE notification
entity = 'disk'
entity_id = f'fs_{device}'
# Identify what this device is (model, type, mountpoint)
device_info = self._identify_block_device(device)
func_match = re.search(r':\s+(\w+:\d+):', msg)
func_info = func_match.group(1) if func_match else ''
inode_match = re.search(r'inode\s+#?(\d+)', msg)
inode = inode_match.group(1) if inode_match else ''
parts = [f'{fs_type} filesystem corruption on /dev/{device}']
# Add device identification so the user knows what this device is
if device_info:
parts.append(f'Device: {device_info}')
else:
parts.append(f'Device: /dev/{device} (not currently detected -- may be a disconnected USB or temporary device)')
if func_info:
parts.append(f'Error: {self._translate_fs_function(func_info)}')
if inode:
inode_hint = 'root directory' if inode == '2' else f'inode #{inode}'
parts.append(f'Affected: {inode_hint}')
parts.append(f'Action: Run "fsck /dev/{device}" (unmount first) or check backup integrity')
enriched = '\n'.join(parts)
else:
# Generic: include the raw journal message for context
enriched = f"{reason}\n{msg[:300]}"
@@ -325,6 +368,92 @@ class JournalWatcher:
self._emit(event_type, severity, data, entity=entity, entity_id=entity_id)
return
def _identify_block_device(self, device: str) -> str:
"""
Identify a block device by querying lsblk.
Returns a human-readable string like:
"KINGSTON SA400S37960G (SSD, 894.3G) mounted at /mnt/data"
"ST8000VN004-3CP101 (HDD, 7.3T) -- not mounted"
Returns empty string if the device is not found.
"""
if not device or device == 'unknown':
return ''
try:
# Try the device as-is first, then the base disk (sdb1 -> sdb)
candidates = [device]
base = re.sub(r'\d+$', '', device) if not ('nvme' in device or 'mmcblk' in device) else device
if base != device:
candidates.append(base)
for dev in candidates:
dev_path = f'/dev/{dev}' if not dev.startswith('/') else dev
result = subprocess.run(
['lsblk', '-ndo', 'NAME,MODEL,SIZE,TRAN,MOUNTPOINT,ROTA', dev_path],
capture_output=True, text=True, timeout=3
)
if result.returncode == 0 and result.stdout.strip():
fields = result.stdout.strip().split(None, 5)
name = fields[0] if len(fields) > 0 else dev
model = fields[1] if len(fields) > 1 and fields[1] else 'Unknown model'
size = fields[2] if len(fields) > 2 else '?'
tran = (fields[3] if len(fields) > 3 else '').upper() # sata, usb, nvme
mountpoint = fields[4] if len(fields) > 4 and fields[4] else ''
rota = fields[5].strip() if len(fields) > 5 else '1'
# Determine disk type
if tran == 'USB':
disk_type = 'USB'
elif tran == 'NVME' or 'nvme' in name:
disk_type = 'NVMe'
elif rota == '0':
disk_type = 'SSD'
else:
disk_type = 'HDD'
info = f'{model} ({disk_type}, {size})'
if mountpoint:
info += f' mounted at {mountpoint}'
elif dev != device:
# Check partition mountpoint
part_result = subprocess.run(
['lsblk', '-ndo', 'MOUNTPOINT', f'/dev/{device}'],
capture_output=True, text=True, timeout=2
)
part_mount = part_result.stdout.strip() if part_result.returncode == 0 else ''
if part_mount:
info += f' partition {device} mounted at {part_mount}'
else:
info += ' -- not mounted'
else:
info += ' -- not mounted'
return info
return ''
except Exception:
return ''
@staticmethod
def _translate_fs_function(func_info: str) -> str:
"""Translate EXT4/filesystem function names to plain language."""
func_name = func_info.split(':')[0] if ':' in func_info else func_info
translations = {
'ext4_find_entry': 'directory lookup failed (possible directory corruption)',
'ext4_lookup': 'file lookup failed (possible metadata corruption)',
'ext4_journal_start': 'journal transaction failed (journal corruption)',
'ext4_readdir': 'directory read failed (directory data corrupted)',
'ext4_get_inode_loc': 'inode location failed (inode table corruption)',
'__ext4_get_inode_loc': 'inode location failed (inode table corruption)',
'ext4_xattr_get': 'extended attributes read failed',
'ext4_iget': 'inode read failed (possible inode corruption)',
'ext4_mb_generate_buddy': 'block allocator error',
'ext4_validate_block_bitmap': 'block bitmap corrupted',
'ext4_validate_inode_bitmap': 'inode bitmap corrupted',
'htree_dirblock_to_tree': 'directory index tree corrupted',
}
desc = translations.get(func_name, func_name)
return desc
def _check_service_failure(self, msg: str, unit: str):
"""Detect critical service failures with enriched context."""
# Filter out noise -- these are normal systemd transient units,
@@ -405,7 +534,16 @@ class JournalWatcher:
return ''
def _check_disk_io(self, msg: str, syslog_id: str, priority: int):
"""Detect disk I/O errors from kernel messages."""
"""
Detect disk I/O errors from kernel messages.
Cross-references SMART health before notifying:
- SMART PASSED -> no notification (transient controller event)
- SMART FAILED/UNKNOWN -> notify with enriched context
Resolves ATA controller names to physical devices and identifies
the disk model/type/mountpoint for the user.
"""
if syslog_id != 'kernel' and priority > 3:
return
@@ -413,20 +551,144 @@ class JournalWatcher:
r'blk_update_request: I/O error.*dev (\S+)',
r'Buffer I/O error on device (\S+)',
r'SCSI error.*sd(\w)',
r'ata\d+.*error',
r'(ata\d+)[\.\d]*:.*error',
]
for pattern in io_patterns:
match = re.search(pattern, msg)
if match:
device = match.group(1) if match.lastindex else 'unknown'
raw_device = match.group(1) if match.lastindex else 'unknown'
# Resolve ATA port to physical disk name
if raw_device.startswith('ata'):
resolved = self._resolve_ata_to_disk(raw_device)
else:
# Strip partition number (sdb1 -> sdb)
resolved = re.sub(r'\d+$', '', raw_device) if raw_device.startswith('sd') else raw_device
# Check SMART health -- if disk is healthy, this is transient noise
smart_health = self._quick_smart_health(resolved)
if smart_health == 'PASSED':
# SMART says disk is fine, don't notify for transient ATA/SCSI events
return
# SMART is FAILED or UNKNOWN -- this may be a real problem
device_info = self._identify_block_device(resolved)
# Build a clear, informative reason
parts = []
if smart_health == 'FAILED':
parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: FAILED)')
else:
parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: unable to verify)')
if device_info:
parts.append(f'Device: {device_info}')
elif resolved.startswith('ata'):
parts.append(f'Device: ATA controller {raw_device} (could not resolve to physical disk)')
else:
parts.append(f'Device: /dev/{resolved} (not currently detected -- may be disconnected or temporary)')
# Extract useful detail from the raw kernel message
detail = self._translate_ata_error(msg)
if detail:
parts.append(f'Detail: {detail}')
parts.append('Action: Check disk health with "smartctl -a /dev/{}" and consider replacement if SMART reports failures'.format(resolved))
enriched = '\n'.join(parts)
self._emit('disk_io_error', 'CRITICAL', {
'device': device,
'reason': msg[:200],
'device': resolved,
'reason': enriched,
'hostname': self._hostname,
}, entity='disk', entity_id=device)
}, entity='disk', entity_id=resolved)
return
def _resolve_ata_to_disk(self, ata_port: str) -> str:
"""Resolve an ATA port name (ata8) to a physical disk name (sda)."""
try:
port_num = re.search(r'ata(\d+)', ata_port)
if not port_num:
return ata_port
num = port_num.group(1)
# Check /sys/class/ata_port for the mapping
import glob as _glob
for path in _glob.glob(f'/sys/class/ata_port/ata{num}/../../host*/target*/*/block/*'):
disk_name = os.path.basename(path)
if disk_name.startswith('sd') or disk_name.startswith('nvme'):
return disk_name
# Fallback: try scsi_host mapping
for path in _glob.glob(f'/sys/class/ata_port/ata{num}/../../host*/scsi_host/host*/../../target*/*/block/*'):
disk_name = os.path.basename(path)
if disk_name.startswith('sd'):
return disk_name
return ata_port
except Exception:
return ata_port
def _quick_smart_health(self, disk_name: str) -> str:
"""Quick SMART health check. Returns 'PASSED', 'FAILED', or 'UNKNOWN'."""
if not disk_name or disk_name.startswith('ata') or disk_name.startswith('zram'):
return 'UNKNOWN'
try:
dev_path = f'/dev/{disk_name}' if not disk_name.startswith('/') else disk_name
result = subprocess.run(
['smartctl', '--health', '-j', dev_path],
capture_output=True, text=True, timeout=5
)
import json as _json
data = _json.loads(result.stdout)
passed = data.get('smart_status', {}).get('passed', None)
if passed is True:
return 'PASSED'
elif passed is False:
return 'FAILED'
return 'UNKNOWN'
except Exception:
return 'UNKNOWN'
@staticmethod
def _translate_ata_error(msg: str) -> str:
"""Translate common ATA/SCSI error codes to human-readable descriptions."""
error_codes = {
'IDNF': 'sector address not found (possible bad sector or cable issue)',
'UNC': 'uncorrectable read error (bad sector)',
'ABRT': 'command aborted by drive',
'AMNF': 'address mark not found (surface damage)',
'TK0NF': 'track 0 not found (drive hardware failure)',
'BBK': 'bad block detected',
'ICRC': 'interface CRC error (cable or connector issue)',
'MC': 'media changed',
'MCR': 'media change requested',
'WP': 'write protected',
}
parts = []
for code, description in error_codes.items():
if code in msg:
parts.append(description)
if parts:
return '; '.join(parts)
# Try to extract the Emask/SErr/action codes
emask = re.search(r'Emask\s+(0x[0-9a-f]+)', msg)
serr = re.search(r'SErr\s+(0x[0-9a-f]+)', msg)
action = re.search(r'action\s+(0x[0-9a-f]+)', msg)
if emask or serr:
info = []
if emask:
info.append(f'Error mask: {emask.group(1)}')
if serr:
info.append(f'SATA error: {serr.group(1)}')
if action and action.group(1) == '0x0':
info.append('auto-recovered')
return ', '.join(info)
return ''
def _check_cluster_events(self, msg: str, syslog_id: str):
"""Detect cluster split-brain and node disconnect."""
msg_lower = msg.lower()
@@ -613,6 +875,12 @@ class TaskWatcher:
# Cache for active vzdump detection
self._vzdump_active_cache: float = 0 # timestamp of last positive check
self._vzdump_cache_ttl = 5 # cache result for 5s
# Internal tracking: when we see a vzdump task without an end status,
# we mark the timestamp. When we see it complete (status=OK/ERROR),
# we clear it. This supplements the /var/log/pve/tasks/active check
# to avoid timing gaps.
self._vzdump_running_since: float = 0 # 0 = no vzdump tracked
self._vzdump_grace_period = 120 # seconds after vzdump ends to still suppress
def start(self):
if self._running:
@@ -634,13 +902,27 @@ class TaskWatcher:
self._running = False
def _is_vzdump_active(self) -> bool:
"""Check if a vzdump (backup) job is currently running.
"""Check if a vzdump (backup) job is currently running or recently finished.
Reads /var/log/pve/tasks/active which lists all running PVE tasks.
Also verifies the process is actually alive (PID check).
Result is cached for a few seconds to avoid excessive file reads.
Two-layer detection:
1. Internal tracking: TaskWatcher marks vzdump start/end with a grace period
(covers the case where the VM restart arrives milliseconds after vzdump ends)
2. /var/log/pve/tasks/active: reads the active task file and verifies PID
This combination eliminates timing gaps that caused false VM notifications.
"""
now = time.time()
# Layer 1: Internal tracking (most reliable, no file I/O)
if self._vzdump_running_since > 0:
elapsed = now - self._vzdump_running_since
if elapsed < self._vzdump_grace_period:
return True
else:
# Grace period expired -- clear the tracking
self._vzdump_running_since = 0
# Layer 2: /var/log/pve/tasks/active (catches vzdump started by other nodes or cron)
# Negative cache: if we recently confirmed NO vzdump, skip the check
if hasattr(self, '_vzdump_negative_cache') and \
now - self._vzdump_negative_cache < self._vzdump_cache_ttl:
@@ -731,7 +1013,17 @@ class TaskWatcher:
event_type, default_severity = event_info
# Track vzdump (backup) tasks internally for VM suppression.
# When a vzdump starts (no status yet), mark it. When it completes
# (status = OK or ERROR), keep a grace period for the post-backup
# VM restart that follows shortly after.
if task_type == 'vzdump':
if not status:
# Backup just started -- track it
self._vzdump_running_since = time.time()
else:
# Backup just finished -- start grace period for VM restarts
self._vzdump_running_since = time.time() # will expire via grace_period
# Check if task failed
is_error = status and status != 'OK' and status != ''
@@ -768,10 +1060,14 @@ class TaskWatcher:
# Determine entity type from task type
entity = 'ct' if task_type.startswith('vz') else 'vm'
# Backup and replication events are handled EXCLUSIVELY by the PVE
# webhook, which delivers much richer data (full logs, sizes, durations,
# filenames). TaskWatcher skips these entirely to avoid duplicates.
_WEBHOOK_EXCLUSIVE = {'backup_complete', 'backup_fail', 'backup_start',
# Backup completion/failure and replication events are handled
# EXCLUSIVELY by the PVE webhook, which delivers richer data (full
# logs, sizes, durations, filenames). TaskWatcher skips these to
# avoid duplicates.
# NOTE: backup_start is NOT in this set -- PVE's webhook only fires
# when a backup FINISHES, so TaskWatcher is the only source for
# the "backup started" notification.
_WEBHOOK_EXCLUSIVE = {'backup_complete', 'backup_fail',
'replication_complete', 'replication_fail'}
if event_type in _WEBHOOK_EXCLUSIVE:
return