add simple webhook feature for peer, interface and user events (#398)

This commit is contained in:
Christoph
2025-04-19 21:29:26 +02:00
parent e75a32e4d0
commit 9354a1d9d3
14 changed files with 411 additions and 51 deletions

View File

@@ -37,6 +37,9 @@ type WireguardDatabaseRepo interface {
type FileSystemRepo interface {
// WriteFile writes the contents to the file at the given path.
WriteFile(path string, contents io.Reader) error
// DeleteFile deletes the file at the given path.
DeleteFile(path string) error
}
type TemplateRenderer interface {
@@ -109,22 +112,37 @@ func (m Manager) createStorageDirectory() error {
}
func (m Manager) connectToMessageBus() {
_ = m.bus.Subscribe(app.TopicInterfaceUpdated, m.handleInterfaceUpdatedEvent)
_ = m.bus.Subscribe(app.TopicInterfaceCreated, m.handleInterfaceSavedEvent)
_ = m.bus.Subscribe(app.TopicInterfaceUpdated, m.handleInterfaceSavedEvent)
_ = m.bus.Subscribe(app.TopicInterfaceDeleted, m.handleInterfaceDeleteEvent)
_ = m.bus.Subscribe(app.TopicPeerInterfaceUpdated, m.handlePeerInterfaceUpdatedEvent)
}
func (m Manager) handleInterfaceUpdatedEvent(iface *domain.Interface) {
func (m Manager) handleInterfaceSavedEvent(iface domain.Interface) {
if !iface.SaveConfig {
return
}
slog.Debug("handling interface updated event", "interface", iface.Identifier)
slog.Debug("handling interface save event", "interface", iface.Identifier)
err := m.PersistInterfaceConfig(context.Background(), iface.Identifier)
if err != nil {
slog.Error("failed to automatically persist interface config",
"interface", iface.Identifier,
"error", err)
"interface", iface.Identifier, "error", err)
}
}
func (m Manager) handleInterfaceDeleteEvent(iface domain.Interface) {
if !iface.SaveConfig {
return
}
slog.Debug("handling interface delete event", "interface", iface.Identifier)
err := m.UnpersistInterfaceConfig(context.Background(), iface.GetConfigFileName())
if err != nil {
slog.Error("failed to remove persisted interface config",
"interface", iface.Identifier, "error", err)
}
}
@@ -251,6 +269,15 @@ func (m Manager) PersistInterfaceConfig(ctx context.Context, id domain.Interface
return nil
}
// UnpersistInterfaceConfig removes the configuration file for the given interface from the file system.
func (m Manager) UnpersistInterfaceConfig(_ context.Context, filename string) error {
if err := m.fsRepo.DeleteFile(filename); err != nil {
return fmt.Errorf("failed to remove interface config: %w", err)
}
return nil
}
type nopCloser struct {
io.Writer
}

View File

@@ -1,21 +1,50 @@
package app
// region misc-events
const TopicAuthLogin = "auth:login"
const TopicRouteUpdate = "route:update"
const TopicRouteRemove = "route:remove"
// endregion misc-events
// region user-events
const TopicUserCreated = "user:created"
const TopicUserDeleted = "user:deleted"
const TopicUserUpdated = "user:updated"
const TopicUserApiEnabled = "user:api:enabled"
const TopicUserApiDisabled = "user:api:disabled"
const TopicUserRegistered = "user:registered"
const TopicUserDisabled = "user:disabled"
const TopicUserEnabled = "user:enabled"
const TopicUserDeleted = "user:deleted"
const TopicAuthLogin = "auth:login"
const TopicRouteUpdate = "route:update"
const TopicRouteRemove = "route:remove"
// endregion user-events
// region interface-events
const TopicInterfaceCreated = "interface:created"
const TopicInterfaceUpdated = "interface:updated"
const TopicInterfaceDeleted = "interface:deleted"
// endregion interface-events
// region peer-events
const TopicPeerCreated = "peer:created"
const TopicPeerDeleted = "peer:deleted"
const TopicPeerUpdated = "peer:updated"
const TopicPeerInterfaceUpdated = "peer:interface:updated"
const TopicPeerIdentifierUpdated = "peer:identifier:updated"
// endregion peer-events
// region audit-events
const TopicAuditLoginSuccess = "audit:login:success"
const TopicAuditLoginFailed = "audit:login:failed"
const TopicAuditInterfaceChanged = "audit:interface:changed"
const TopicAuditPeerChanged = "audit:peer:changed"
// endregion audit-events

View File

@@ -77,47 +77,12 @@ func (m Manager) RegisterUser(ctx context.Context, user *domain.User) error {
return err
}
err := m.NewUser(ctx, user)
createdUser, err := m.CreateUser(ctx, user)
if err != nil {
return err
}
m.bus.Publish(app.TopicUserRegistered, user)
return nil
}
// NewUser creates a new user.
func (m Manager) NewUser(ctx context.Context, user *domain.User) error {
if user.Identifier == "" {
return errors.New("missing user identifier")
}
if err := domain.ValidateAdminAccessRights(ctx); err != nil {
return err
}
err := m.users.SaveUser(ctx, user.Identifier, func(u *domain.User) (*domain.User, error) {
u.Identifier = user.Identifier
u.Email = user.Email
u.Source = user.Source
u.ProviderName = user.ProviderName
u.IsAdmin = user.IsAdmin
u.Firstname = user.Firstname
u.Lastname = user.Lastname
u.Phone = user.Phone
u.Department = user.Department
u.Notes = user.Notes
u.ApiToken = user.ApiToken
u.ApiTokenCreated = user.ApiTokenCreated
return u, nil
})
if err != nil {
return fmt.Errorf("failed to save user: %w", err)
}
m.bus.Publish(app.TopicUserCreated, user)
m.bus.Publish(app.TopicUserRegistered, createdUser)
return nil
}
@@ -229,6 +194,8 @@ func (m Manager) UpdateUser(ctx context.Context, user *domain.User) (*domain.Use
return nil, fmt.Errorf("update failure: %w", err)
}
m.bus.Publish(app.TopicUserUpdated, *user)
switch {
case !existingUser.IsDisabled() && user.IsDisabled():
m.bus.Publish(app.TopicUserDisabled, *user)
@@ -241,6 +208,10 @@ func (m Manager) UpdateUser(ctx context.Context, user *domain.User) (*domain.Use
// CreateUser creates a new user.
func (m Manager) CreateUser(ctx context.Context, user *domain.User) (*domain.User, error) {
if user.Identifier == "" {
return nil, errors.New("missing user identifier")
}
if err := domain.ValidateAdminAccessRights(ctx); err != nil {
return nil, err
}
@@ -270,6 +241,8 @@ func (m Manager) CreateUser(ctx context.Context, user *domain.User) (*domain.Use
return nil, fmt.Errorf("creation failure: %w", err)
}
m.bus.Publish(app.TopicUserCreated, *user)
return user, nil
}
@@ -321,6 +294,7 @@ func (m Manager) ActivateApi(ctx context.Context, id domain.UserIdentifier) (*do
return nil, fmt.Errorf("update failure: %w", err)
}
m.bus.Publish(app.TopicUserUpdated, user)
m.bus.Publish(app.TopicUserApiEnabled, user)
return user, nil
@@ -348,6 +322,7 @@ func (m Manager) DeactivateApi(ctx context.Context, id domain.UserIdentifier) (*
return nil, fmt.Errorf("update failure: %w", err)
}
m.bus.Publish(app.TopicUserUpdated, user)
m.bus.Publish(app.TopicUserApiDisabled, user)
return user, nil
@@ -555,7 +530,7 @@ func (m Manager) updateLdapUsers(
// create new user
slog.Debug("creating new user from provider", "user", user.Identifier, "provider", provider.ProviderName)
err := m.NewUser(tctx, user)
_, err := m.CreateUser(tctx, user)
if err != nil {
cancel()
return fmt.Errorf("create error for user id %s: %w", user.Identifier, err)

View File

@@ -0,0 +1,185 @@
package webhooks
import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"github.com/h44z/wg-portal/internal/app"
"github.com/h44z/wg-portal/internal/config"
"github.com/h44z/wg-portal/internal/domain"
)
// region dependencies
type EventBus interface {
// Publish sends a message to the message bus.
Publish(topic string, args ...any)
// Subscribe subscribes to a topic
Subscribe(topic string, fn interface{}) error
}
// endregion dependencies
type Manager struct {
cfg *config.Config
bus EventBus
client *http.Client
}
// NewManager creates a new webhook manager instance.
func NewManager(cfg *config.Config, bus EventBus) (*Manager, error) {
m := &Manager{
cfg: cfg,
bus: bus,
client: &http.Client{
Timeout: cfg.Webhook.Timeout,
},
}
m.connectToMessageBus()
return m, nil
}
// StartBackgroundJobs starts background jobs for the webhook manager.
// This method is non-blocking and returns immediately.
func (m Manager) StartBackgroundJobs(_ context.Context) {
// this is a no-op for now
}
func (m Manager) connectToMessageBus() {
if m.cfg.Webhook.Url == "" {
slog.Info("[WEBHOOK] no webhook configured, skipping event-bus subscription")
return
}
_ = m.bus.Subscribe(app.TopicUserCreated, m.handleUserCreateEvent)
_ = m.bus.Subscribe(app.TopicUserUpdated, m.handleUserUpdateEvent)
_ = m.bus.Subscribe(app.TopicUserDeleted, m.handleUserDeleteEvent)
_ = m.bus.Subscribe(app.TopicPeerCreated, m.handlePeerCreateEvent)
_ = m.bus.Subscribe(app.TopicPeerUpdated, m.handlePeerUpdateEvent)
_ = m.bus.Subscribe(app.TopicPeerDeleted, m.handlePeerDeleteEvent)
_ = m.bus.Subscribe(app.TopicInterfaceCreated, m.handleInterfaceCreateEvent)
_ = m.bus.Subscribe(app.TopicInterfaceUpdated, m.handleInterfaceUpdateEvent)
_ = m.bus.Subscribe(app.TopicInterfaceDeleted, m.handleInterfaceDeleteEvent)
}
func (m Manager) sendWebhook(ctx context.Context, data io.Reader) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.cfg.Webhook.Url, data)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if m.cfg.Webhook.Authentication != "" {
req.Header.Set("Authorization", m.cfg.Webhook.Authentication)
}
resp, err := m.client.Do(req)
if err != nil {
return err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
slog.Error("[WEBHOOK] failed to close response body", "error", err)
}
}(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("webhook request failed with status: %s", resp.Status)
}
return nil
}
func (m Manager) handleUserCreateEvent(user domain.User) {
m.handleGenericEvent(WebhookEventCreate, user)
}
func (m Manager) handleUserUpdateEvent(user domain.User) {
m.handleGenericEvent(WebhookEventUpdate, user)
}
func (m Manager) handleUserDeleteEvent(user domain.User) {
m.handleGenericEvent(WebhookEventDelete, user)
}
func (m Manager) handlePeerCreateEvent(peer domain.Peer) {
m.handleGenericEvent(WebhookEventCreate, peer)
}
func (m Manager) handlePeerUpdateEvent(peer domain.Peer) {
m.handleGenericEvent(WebhookEventUpdate, peer)
}
func (m Manager) handlePeerDeleteEvent(peer domain.Peer) {
m.handleGenericEvent(WebhookEventDelete, peer)
}
func (m Manager) handleInterfaceCreateEvent(iface domain.Interface) {
m.handleGenericEvent(WebhookEventCreate, iface)
}
func (m Manager) handleInterfaceUpdateEvent(iface domain.Interface) {
m.handleGenericEvent(WebhookEventUpdate, iface)
}
func (m Manager) handleInterfaceDeleteEvent(iface domain.Interface) {
m.handleGenericEvent(WebhookEventDelete, iface)
}
func (m Manager) handleGenericEvent(action WebhookEvent, payload any) {
eventData, err := m.createWebhookData(action, payload)
if err != nil {
slog.Error("[WEBHOOK] failed to create webhook data", "error", err, "action", action,
"payload", fmt.Sprintf("%T", payload))
return
}
eventJson, err := eventData.Serialize()
if err != nil {
slog.Error("[WEBHOOK] failed to serialize event data", "error", err, "action", action,
"payload", fmt.Sprintf("%T", payload), "identifier", eventData.Identifier)
return
}
err = m.sendWebhook(context.Background(), eventJson)
if err != nil {
slog.Error("[WEBHOOK] failed to execute webhook", "error", err, "action", action,
"payload", fmt.Sprintf("%T", payload), "identifier", eventData.Identifier)
return
}
slog.Info("[WEBHOOK] executed webhook", "action", action, "payload", fmt.Sprintf("%T", payload),
"identifier", eventData.Identifier)
}
func (m Manager) createWebhookData(action WebhookEvent, payload any) (*WebhookData, error) {
d := &WebhookData{
Event: action,
Payload: payload,
}
switch v := payload.(type) {
case domain.User:
d.Entity = WebhookEntityUser
d.Identifier = string(v.Identifier)
case domain.Peer:
d.Entity = WebhookEntityPeer
d.Identifier = string(v.Identifier)
case domain.Interface:
d.Entity = WebhookEntityInterface
d.Identifier = string(v.Identifier)
default:
return nil, fmt.Errorf("unsupported payload type: %T", v)
}
return d, nil
}

View File

@@ -0,0 +1,48 @@
package webhooks
import (
"bytes"
"encoding/json"
"io"
)
// WebhookData is the data structure for the webhook payload.
type WebhookData struct {
// Event is the event type (e.g. create, update, delete)
Event WebhookEvent `json:"event" example:"create"`
// Entity is the entity type (e.g. user, peer, interface)
Entity WebhookEntity `json:"entity" example:"user"`
// Identifier is the identifier of the entity
Identifier string `json:"identifier" example:"user-123"`
// Payload is the payload of the event
Payload any `json:"payload"`
}
// Serialize serializes the WebhookData to JSON and returns it as an io.Reader.
func (d *WebhookData) Serialize() (io.Reader, error) {
data, err := json.Marshal(d)
if err != nil {
return nil, err
}
return bytes.NewReader(data), nil
}
type WebhookEntity = string
const (
WebhookEntityUser WebhookEntity = "user"
WebhookEntityPeer WebhookEntity = "peer"
WebhookEntityInterface WebhookEntity = "interface"
)
type WebhookEvent = string
const (
WebhookEventCreate WebhookEvent = "create"
WebhookEventUpdate WebhookEvent = "update"
WebhookEventDelete WebhookEvent = "delete"
)

View File

@@ -410,6 +410,8 @@ func (m Manager) CreateInterface(ctx context.Context, in *domain.Interface) (*do
return nil, fmt.Errorf("creation failure: %w", err)
}
m.bus.Publish(app.TopicInterfaceCreated, *in)
return in, nil
}
@@ -433,6 +435,8 @@ func (m Manager) UpdateInterface(ctx context.Context, in *domain.Interface) (*do
return nil, nil, fmt.Errorf("update failure: %w", err)
}
m.bus.Publish(app.TopicInterfaceUpdated, *in)
return in, existingPeers, nil
}
@@ -490,6 +494,8 @@ func (m Manager) DeleteInterface(ctx context.Context, id domain.InterfaceIdentif
return fmt.Errorf("post-delete hooks failed: %w", err)
}
m.bus.Publish(app.TopicInterfaceDeleted, *existingInterface)
return nil
}
@@ -549,7 +555,6 @@ func (m Manager) saveInterface(ctx context.Context, iface *domain.Interface) (
return nil, fmt.Errorf("post-save hooks failed: %w", err)
}
m.bus.Publish(app.TopicInterfaceUpdated, iface)
m.bus.Publish(app.TopicAuditInterfaceChanged, domain.AuditEventWrapper[audit.InterfaceEvent]{
Ctx: ctx,
Event: audit.InterfaceEvent{

View File

@@ -204,6 +204,8 @@ func (m Manager) CreatePeer(ctx context.Context, peer *domain.Peer) (*domain.Pee
return nil, fmt.Errorf("creation failure: %w", err)
}
m.bus.Publish(app.TopicPeerCreated, *peer)
return peer, nil
}
@@ -246,6 +248,8 @@ func (m Manager) CreateMultiplePeers(
createdPeers := make([]domain.Peer, len(newPeers))
for i := range newPeers {
createdPeers[i] = *newPeers[i]
m.bus.Publish(app.TopicPeerCreated, *newPeers[i])
}
return createdPeers, nil
@@ -315,6 +319,8 @@ func (m Manager) UpdatePeer(ctx context.Context, peer *domain.Peer) (*domain.Pee
}
}
m.bus.Publish(app.TopicPeerUpdated, *peer)
return peer, nil
}
@@ -343,6 +349,7 @@ func (m Manager) DeletePeer(ctx context.Context, id domain.PeerIdentifier) error
return fmt.Errorf("failed to delete peer %s: %w", id, err)
}
m.bus.Publish(app.TopicPeerDeleted, *peer)
// Update routes after peers have changed
m.bus.Publish(app.TopicRouteUpdate, "peers updated")
// Update interface after peers have changed
@@ -428,6 +435,7 @@ func (m Manager) savePeers(ctx context.Context, peers ...*domain.Peer) error {
}
// publish event
m.bus.Publish(app.TopicAuditPeerChanged, domain.AuditEventWrapper[audit.PeerEvent]{
Ctx: ctx,
Event: audit.PeerEvent{