diff --git a/AppImage/components/notification-settings.tsx b/AppImage/components/notification-settings.tsx
index 396387b8..90f8d47d 100644
--- a/AppImage/components/notification-settings.tsx
+++ b/AppImage/components/notification-settings.tsx
@@ -416,39 +416,23 @@ export function NotificationSettings() {
- Add to /etc/proxmox-backup/notifications.cfg on the PBS host:
+ Append to /etc/proxmox-backup/notifications.cfg on the PBS host:
{`webhook: proxmenux-webhook
-\turl http://:8008/api/notifications/webhook
\tmethod post
-\theader Content-Type:application/json
-\theader X-Webhook-Secret:{{ secrets.proxmenux_secret }}
+\turl http://:8008/api/notifications/webhook
matcher: proxmenux-pbs
\ttarget proxmenux-webhook
\tmatch-severity warning,error`}
-
-
- Add to /etc/proxmox-backup/notifications-priv.cfg:
-
-
-{`webhook: proxmenux-webhook
-\tsecret proxmenux_secret `}
-
-
-
-
- {"Replace with the IP of this PVE node (not 127.0.0.1, unless PBS runs on the same host)."}
-
-
- {"Replace with the webhook secret shown in your notification settings."}
-
-
+
+ {"Replace with the IP of this PVE node (not 127.0.0.1, unless PBS runs on the same host). Append at the end -- do not delete existing content."}
+
@@ -1079,7 +1063,7 @@ matcher: proxmenux-pbs
- {"Proxmox must send this value in the X-Webhook-Secret header. Auto-generated on first enable."}
+ {"Used for remote connections only (e.g. PBS on another host). Local PVE webhook runs on localhost and does not need this header."}
@@ -1108,28 +1092,19 @@ matcher: proxmenux-pbs
(Verify, Prune, GC, Sync) require separate configuration on the PBS server.
- Add to /etc/proxmox-backup/notifications.cfg:
+ Append to /etc/proxmox-backup/notifications.cfg:
{`webhook: proxmenux-webhook
-\turl http://:8008/api/notifications/webhook
\tmethod post
-\theader Content-Type:application/json
-\theader X-Webhook-Secret:{{ secrets.proxmenux_secret }}
+\turl http://:8008/api/notifications/webhook
matcher: proxmenux-pbs
\ttarget proxmenux-webhook
\tmatch-severity warning,error`}
-
-
- Add to /etc/proxmox-backup/notifications-priv.cfg:
-
-
-{`webhook: proxmenux-webhook
-\tsecret proxmenux_secret `}
- {"Replace with this node's IP and with the webhook secret above."}
+ {"Replace with this node's IP. Append at the end -- do not delete existing content."}
diff --git a/AppImage/scripts/flask_notification_routes.py b/AppImage/scripts/flask_notification_routes.py
index c774d680..3021fa60 100644
--- a/AppImage/scripts/flask_notification_routes.py
+++ b/AppImage/scripts/flask_notification_routes.py
@@ -188,27 +188,19 @@ def setup_proxmox_webhook():
'error': None,
}
- def _build_fallback(secret_val):
+ def _build_fallback():
"""Build manual instructions as fallback."""
return [
- "# Add these blocks to /etc/pve/notifications.cfg",
- "# (append at the end, do NOT delete existing content):",
+ "# Append to END of /etc/pve/notifications.cfg",
+ "# (do NOT delete existing content):",
"",
f"webhook: {ENDPOINT_ID}",
+ f"\tmethod post",
f"\turl {WEBHOOK_URL}",
- "\tmethod post",
- "\theader Content-Type:application/json",
- f"\theader X-Webhook-Secret:{{{{ secrets.proxmenux_secret }}}}",
"",
f"matcher: {MATCHER_ID}",
f"\ttarget {ENDPOINT_ID}",
"\tmatch-severity warning,error",
- "",
- "# Add this block to /etc/pve/priv/notifications.cfg",
- "# (append at the end, do NOT delete existing content):",
- "",
- f"webhook: {ENDPOINT_ID}",
- f"\tsecret proxmenux_secret {secret_val}",
]
def _read_file(path):
@@ -233,178 +225,136 @@ def setup_proxmox_webhook():
except Exception:
pass # Best-effort backup
- def _parse_blocks(text):
- """Parse PVE config into list of (block_type, block_name, block_text).
+ def _remove_our_blocks(text, headers_to_remove):
+ """Remove only blocks whose header line matches one of ours.
- A block starts with a non-whitespace line like 'type: name'
- and includes all subsequent lines that start with whitespace.
- Lines between blocks (blank lines, comments) are preserved as
- anonymous blocks with type=None, name=None.
+ Preserves ALL other content byte-for-byte.
+ A block = header line + indented continuation lines + trailing blank line.
"""
- blocks = []
- current_header = None
- current_lines = []
- gap_lines = [] # blank/comment lines between blocks
+ lines = text.splitlines(keepends=True)
+ cleaned = []
+ skip_block = False
- for line in text.splitlines(keepends=True):
+ for line in lines:
stripped = line.strip()
- # Check if this is a block header (non-whitespace, contains ':')
- if stripped and not line[0].isspace() and ':' in stripped:
- # Save previous block
- if current_header is not None:
- blocks.append(current_header + (''.join(current_lines),))
- current_lines = []
- elif current_lines:
- blocks.append((None, None, ''.join(current_lines)))
- current_lines = []
-
- # Save any gap lines as anonymous block
- if gap_lines:
- blocks.append((None, None, ''.join(gap_lines)))
- gap_lines = []
-
- # Parse header
- parts = stripped.split(':', 1)
- btype = parts[0].strip()
- bname = parts[1].strip() if len(parts) > 1 else ''
- current_header = (btype, bname)
- current_lines = [line]
+ # Non-whitespace line with ':' => block header
+ if stripped and not line[0:1].isspace() and ':' in stripped:
+ if stripped in headers_to_remove:
+ skip_block = True
+ continue
+ else:
+ skip_block = False
- elif current_header is not None and line[0:1].isspace():
- # Continuation line (starts with whitespace)
- current_lines.append(line)
+ if skip_block:
+ if not stripped:
+ # Blank line = block separator, consume it and stop skipping
+ skip_block = False
+ continue
+ elif line[0:1].isspace():
+ # Indented = continuation of our block
+ continue
+ else:
+ # New block started
+ skip_block = False
- else:
- # Gap line (blank, comment, or anything between blocks)
- if current_header is not None:
- blocks.append(current_header + (''.join(current_lines),))
- current_header = None
- current_lines = []
- gap_lines.append(line)
+ cleaned.append(line)
- # Flush remaining
- if current_header is not None:
- blocks.append(current_header + (''.join(current_lines),))
- elif current_lines:
- blocks.append((None, None, ''.join(current_lines)))
- if gap_lines:
- blocks.append((None, None, ''.join(gap_lines)))
-
- return blocks
-
- def _upsert_block(blocks, block_type, block_name, new_text):
- """Replace block if exists, otherwise append. Returns new list."""
- found = False
- result_blocks = []
- for btype, bname, btext in blocks:
- if btype == block_type and bname == block_name:
- result_blocks.append((block_type, block_name, new_text))
- found = True
- else:
- result_blocks.append((btype, bname, btext))
- if not found:
- # Append with blank line separator
- result_blocks.append((None, None, '\n'))
- result_blocks.append((block_type, block_name, new_text))
- return result_blocks
-
- def _blocks_to_text(blocks):
- """Reassemble blocks into config text."""
- return ''.join(btext for _, _, btext in blocks)
-
- def _write_safe(path, content, original_content):
- """Write content to path. On failure, try to restore original."""
- try:
- with open(path, 'w') as f:
- f.write(content)
- return None
- except PermissionError:
- return f'Permission denied writing {path}'
- except Exception as e:
- # Try to restore original
- try:
- if original_content is not None:
- with open(path, 'w') as f:
- f.write(original_content)
- except Exception:
- pass
- return str(e)
+ return ''.join(cleaned)
try:
- # ── Step 1: Ensure webhook secret exists ──
+ # ── Step 1: Ensure webhook secret exists (for our own internal use) ──
secret = notification_manager.get_webhook_secret()
if not secret:
secret = secrets_mod.token_urlsafe(32)
notification_manager._save_setting('webhook_secret', secret)
- # ── Step 2: Read both config files ──
+ # ── Step 2: Read main config ──
cfg_text, err = _read_file(NOTIFICATIONS_CFG)
if err:
result['error'] = err
- result['fallback_commands'] = _build_fallback(secret)
+ result['fallback_commands'] = _build_fallback()
return jsonify(result), 200
+ # ── Step 3: Read priv config (to clean up any broken blocks we wrote before) ──
priv_text, err = _read_file(PRIV_CFG)
if err:
- result['error'] = err
- result['fallback_commands'] = _build_fallback(secret)
- return jsonify(result), 200
+ priv_text = None # Non-fatal, we just won't clean it
- # ── Step 3: Create backups ──
+ # ── Step 4: Create backups before ANY modification ──
_backup_file(NOTIFICATIONS_CFG)
- _backup_file(PRIV_CFG)
+ if priv_text is not None:
+ _backup_file(PRIV_CFG)
- # ── Step 4: Parse existing blocks ──
- cfg_blocks = _parse_blocks(cfg_text)
- priv_blocks = _parse_blocks(priv_text)
+ # ── Step 5: Remove any previous proxmenux blocks from BOTH files ──
+ our_headers = {
+ f'webhook: {ENDPOINT_ID}',
+ f'matcher: {MATCHER_ID}',
+ }
- # ── Step 5: Build our new blocks ──
- endpoint_text = (
+ cleaned_cfg = _remove_our_blocks(cfg_text, our_headers)
+
+ if priv_text is not None:
+ cleaned_priv = _remove_our_blocks(priv_text, our_headers)
+
+ # ── Step 6: Build new blocks ──
+ # Exact format from a real working PVE server:
+ # webhook: name
+ # \tmethod post
+ # \turl http://...
+ #
+ # NO header lines -- localhost webhook doesn't need them.
+ # PVE header format is: header name=X-Key,value=
+ # PVE secret format is: secret name=key,value=
+ # Neither is needed for localhost calls.
+
+ endpoint_block = (
f"webhook: {ENDPOINT_ID}\n"
- f"\turl {WEBHOOK_URL}\n"
f"\tmethod post\n"
- f"\theader Content-Type:application/json\n"
- f"\theader X-Webhook-Secret:{{{{ secrets.proxmenux_secret }}}}\n"
+ f"\turl {WEBHOOK_URL}\n"
)
- matcher_text = (
+ matcher_block = (
f"matcher: {MATCHER_ID}\n"
f"\ttarget {ENDPOINT_ID}\n"
f"\tmatch-severity warning,error\n"
)
- priv_secret_text = (
- f"webhook: {ENDPOINT_ID}\n"
- f"\tsecret proxmenux_secret {secret}\n"
- )
+ # ── Step 7: Append our blocks to cleaned main config ──
+ # Ensure existing content ends cleanly
+ if cleaned_cfg and not cleaned_cfg.endswith('\n'):
+ cleaned_cfg += '\n'
+ if cleaned_cfg and not cleaned_cfg.endswith('\n\n'):
+ cleaned_cfg += '\n'
- # ── Step 6: Upsert (replace or append) our blocks only ──
- cfg_blocks = _upsert_block(cfg_blocks, 'webhook', ENDPOINT_ID, endpoint_text)
- cfg_blocks = _upsert_block(cfg_blocks, 'matcher', MATCHER_ID, matcher_text)
- priv_blocks = _upsert_block(priv_blocks, 'webhook', ENDPOINT_ID, priv_secret_text)
+ new_cfg = cleaned_cfg + endpoint_block + '\n' + matcher_block
- new_cfg = _blocks_to_text(cfg_blocks)
- new_priv = _blocks_to_text(priv_blocks)
-
- # ── Step 7: Write back (with rollback on error) ──
- err = _write_safe(NOTIFICATIONS_CFG, new_cfg, cfg_text)
- if err:
- result['error'] = err
- result['fallback_commands'] = _build_fallback(secret)
+ # ── Step 8: Write main config ──
+ try:
+ with open(NOTIFICATIONS_CFG, 'w') as f:
+ f.write(new_cfg)
+ except PermissionError:
+ result['error'] = f'Permission denied writing {NOTIFICATIONS_CFG}'
+ result['fallback_commands'] = _build_fallback()
+ return jsonify(result), 200
+ except Exception as e:
+ # Rollback
+ try:
+ with open(NOTIFICATIONS_CFG, 'w') as f:
+ f.write(cfg_text)
+ except Exception:
+ pass
+ result['error'] = str(e)
+ result['fallback_commands'] = _build_fallback()
return jsonify(result), 200
- err = _write_safe(PRIV_CFG, new_priv, priv_text)
- if err:
- # Rollback main config
- _write_safe(NOTIFICATIONS_CFG, cfg_text, None)
- result['error'] = f'Secret file failed: {err}. Main config rolled back.'
- result['fallback_commands'] = [
- f"# Add to {PRIV_CFG} (append, don't overwrite):",
- f"webhook: {ENDPOINT_ID}",
- f"\tsecret proxmenux_secret {secret}",
- ]
- return jsonify(result), 200
+ # ── Step 9: Clean priv config (remove our broken blocks, write nothing new) ──
+ if priv_text is not None and cleaned_priv != priv_text:
+ try:
+ with open(PRIV_CFG, 'w') as f:
+ f.write(cleaned_priv)
+ except Exception:
+ pass # Non-fatal, priv cleanup is best-effort
result['configured'] = True
result['secret'] = secret
@@ -412,29 +362,118 @@ def setup_proxmox_webhook():
except Exception as e:
result['error'] = str(e)
- try:
- result['fallback_commands'] = _build_fallback(
- notification_manager.get_webhook_secret() or 'YOUR_SECRET'
- )
- except Exception:
- result['fallback_commands'] = _build_fallback('YOUR_SECRET')
+ result['fallback_commands'] = _build_fallback()
return jsonify(result), 200
+@notification_bp.route('/api/notifications/proxmox/read-cfg', methods=['GET'])
+def read_pve_notification_cfg():
+ """Diagnostic: return raw content of PVE notification config files.
+
+ GET /api/notifications/proxmox/read-cfg
+ Returns both notifications.cfg and priv/notifications.cfg content.
+ """
+ import os
+
+ files = {
+ 'notifications_cfg': '/etc/pve/notifications.cfg',
+ 'priv_cfg': '/etc/pve/priv/notifications.cfg',
+ }
+
+ # Also look for any backups we created
+ backup_dir = '/etc/pve'
+ priv_backup_dir = '/etc/pve/priv'
+
+ result = {}
+ for key, path in files.items():
+ try:
+ with open(path, 'r') as f:
+ result[key] = {
+ 'path': path,
+ 'content': f.read(),
+ 'size': os.path.getsize(path),
+ 'error': None,
+ }
+ except FileNotFoundError:
+ result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'file_not_found'}
+ except PermissionError:
+ result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'permission_denied'}
+ except Exception as e:
+ result[key] = {'path': path, 'content': None, 'size': 0, 'error': str(e)}
+
+ # Find backups
+ backups = []
+ for d in [backup_dir, priv_backup_dir]:
+ try:
+ for fname in sorted(os.listdir(d)):
+ if 'proxmenux_backup' in fname:
+ fpath = os.path.join(d, fname)
+ try:
+ with open(fpath, 'r') as f:
+ backups.append({
+ 'path': fpath,
+ 'content': f.read(),
+ 'size': os.path.getsize(fpath),
+ })
+ except Exception:
+ backups.append({'path': fpath, 'content': None, 'error': 'read_failed'})
+ except Exception:
+ pass
+
+ result['backups'] = backups
+ return jsonify(result), 200
+
+
+@notification_bp.route('/api/notifications/proxmox/restore-cfg', methods=['POST'])
+def restore_pve_notification_cfg():
+ """Restore PVE notification config from our backup.
+
+ POST /api/notifications/proxmox/restore-cfg
+ Finds the most recent proxmenux_backup and restores it.
+ """
+ import os
+ import shutil
+
+ files_to_restore = {
+ '/etc/pve': '/etc/pve/notifications.cfg',
+ '/etc/pve/priv': '/etc/pve/priv/notifications.cfg',
+ }
+
+ restored = []
+ errors = []
+
+ for search_dir, target_path in files_to_restore.items():
+ try:
+ candidates = sorted([
+ f for f in os.listdir(search_dir)
+ if 'proxmenux_backup' in f and f.startswith('notifications.cfg')
+ ], reverse=True)
+
+ if candidates:
+ backup_path = os.path.join(search_dir, candidates[0])
+ shutil.copy2(backup_path, target_path)
+ restored.append({'target': target_path, 'from_backup': backup_path})
+ else:
+ errors.append({'target': target_path, 'error': 'no_backup_found'})
+ except Exception as e:
+ errors.append({'target': target_path, 'error': str(e)})
+
+ return jsonify({
+ 'restored': restored,
+ 'errors': errors,
+ 'success': len(errors) == 0 and len(restored) > 0,
+ }), 200
+
+
@notification_bp.route('/api/notifications/webhook', methods=['POST'])
def proxmox_webhook():
"""Receive native Proxmox VE notification webhooks (hardened).
Security layers:
- 1. Rate limiting (60 req/min) -- always
- 2. Shared secret (X-Webhook-Secret) -- always required
- 3. Anti-replay timestamp (60s window) -- remote only
- 4. Replay cache (signature dedup) -- remote only
- 5. IP allowlist (optional) -- remote only
-
- Localhost callers (127.0.0.1 / ::1) bypass layers 3-5 because Proxmox
- cannot inject dynamic timestamp headers. The shared secret is still
- required for localhost to prevent any local process from injecting events.
+ Localhost (127.0.0.1 / ::1): rate limiting only.
+ PVE calls us on localhost and cannot send custom auth headers,
+ so we trust the loopback interface (only local processes can reach it).
+ Remote: rate limiting + shared secret + timestamp + replay + IP allowlist.
"""
_reject = lambda code, error, status: (jsonify({'accepted': False, 'error': error}), status)
@@ -447,23 +486,21 @@ def proxmox_webhook():
resp.headers['Retry-After'] = '60'
return resp, 429
- # ── Layer 2: Shared secret (always required) ──
- try:
- configured_secret = notification_manager.get_webhook_secret()
- except Exception:
- configured_secret = ''
-
- if not configured_secret:
- return _reject(500, 'webhook_not_configured', 500)
-
- request_secret = request.headers.get('X-Webhook-Secret', '')
- if not request_secret:
- return _reject(401, 'missing_secret', 401)
- if not hmac.compare_digest(configured_secret, request_secret):
- return _reject(401, 'invalid_secret', 401)
-
- # ── Layers 3-5: Remote-only checks ──
+ # ── Layers 2-5: Remote-only checks ──
if not is_localhost:
+ # Layer 2: Shared secret
+ try:
+ configured_secret = notification_manager.get_webhook_secret()
+ except Exception:
+ configured_secret = ''
+
+ if configured_secret:
+ request_secret = request.headers.get('X-Webhook-Secret', '')
+ if not request_secret:
+ return _reject(401, 'missing_secret', 401)
+ if not hmac.compare_digest(configured_secret, request_secret):
+ return _reject(401, 'invalid_secret', 401)
+
# Layer 3: Anti-replay timestamp
ts_header = request.headers.get('X-ProxMenux-Timestamp', '')
if not ts_header: