Merge branch 'master' into mikrotik_integration

# Conflicts:
#	internal/app/api/v0/handlers/endpoint_config.go
#	internal/app/api/v0/model/models.go
#	internal/app/wireguard/statistics.go
#	internal/app/wireguard/wireguard_interfaces.go
This commit is contained in:
Christoph Haas
2025-07-29 22:16:00 +02:00
62 changed files with 1383 additions and 378 deletions

View File

@@ -36,6 +36,8 @@ type StatisticsMetricsServer interface {
type StatisticsEventBus interface {
// Subscribe subscribes to a topic
Subscribe(topic string, fn interface{}) error
// Publish sends a message to the message bus.
Publish(topic string, args ...any)
}
type pingJob struct {
@@ -53,6 +55,8 @@ type StatisticsCollector struct {
db StatisticsDatabaseRepo
wg *ControllerManager
ms StatisticsMetricsServer
peerChangeEvent chan domain.PeerIdentifier
}
// NewStatisticsCollector creates a new statistics collector.
@@ -169,8 +173,12 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
continue
}
for _, peer := range peers {
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
err = c.db.UpdatePeerStatus(ctx, peer.Identifier,
func(p *domain.PeerStatus) (*domain.PeerStatus, error) {
wasConnected := p.IsConnected
var lastHandshake *time.Time
if !peer.LastHandshake.IsZero() {
lastHandshake = &peer.LastHandshake
@@ -184,6 +192,13 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
p.BytesTransmitted = peer.BytesDownload // store bytes that where received from the peer and sent by the server
p.Endpoint = peer.Endpoint
p.LastHandshake = lastHandshake
p.CalcConnected()
if wasConnected != p.IsConnected {
slog.Debug("peer connection state changed", "peer", peer.Identifier, "connected", p.IsConnected)
connectionStateChanged = true
newPeerStatus = *p // store new status for event publishing
}
// Update prometheus metrics
go c.updatePeerMetrics(ctx, *p)
@@ -195,6 +210,17 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
} else {
slog.Debug("updated peer status", "peer", peer.Identifier)
}
if connectionStateChanged {
peerModel, err := c.db.GetPeer(ctx, peer.Identifier)
if err != nil {
slog.Error("failed to fetch peer for data collection", "peer", peer.Identifier, "error",
err)
continue
}
// publish event if connection state changed
c.bus.Publish(app.TopicPeerStateChanged, newPeerStatus, *peerModel)
}
}
}
}
@@ -301,12 +327,18 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
for job := range c.pingJobs {
peer := job.Peer
backend := job.Backend
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
peerPingable := c.isPeerPingable(ctx, backend, peer)
slog.Debug("peer ping check completed", "peer", peer.Identifier, "pingable", peerPingable)
now := time.Now()
err := c.db.UpdatePeerStatus(ctx, peer.Identifier,
func(p *domain.PeerStatus) (*domain.PeerStatus, error) {
wasConnected := p.IsConnected
if peerPingable {
p.IsPingable = true
p.LastPing = &now
@@ -314,6 +346,13 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
p.IsPingable = false
p.LastPing = nil
}
p.UpdatedAt = time.Now()
p.CalcConnected()
if wasConnected != p.IsConnected {
connectionStateChanged = true
newPeerStatus = *p // store new status for event publishing
}
// Update prometheus metrics
go c.updatePeerMetrics(ctx, *p)
@@ -325,6 +364,11 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
} else {
slog.Debug("updated peer ping status", "peer", peer.Identifier)
}
if connectionStateChanged {
// publish event if connection state changed
c.bus.Publish(app.TopicPeerStateChanged, newPeerStatus, peer)
}
}
}

View File

@@ -471,7 +471,7 @@ func (m Manager) DeleteInterface(ctx context.Context, id domain.InterfaceIdentif
physicalInterface, _ := m.wg.GetController(*existingInterface).GetInterface(ctx, id)
if err := m.handleInterfacePreSaveHooks(true, existingInterface); err != nil {
if err := m.handleInterfacePreSaveHooks(existingInterface, !existingInterface.IsDisabled(), false); err != nil {
return fmt.Errorf("pre-delete hooks failed: %w", err)
}
@@ -500,7 +500,7 @@ func (m Manager) DeleteInterface(ctx context.Context, id domain.InterfaceIdentif
Table: existingInterface.GetRoutingTable(),
})
if err := m.handleInterfacePostSaveHooks(true, existingInterface); err != nil {
if err := m.handleInterfacePostSaveHooks(existingInterface, !existingInterface.IsDisabled(), false); err != nil {
return fmt.Errorf("post-delete hooks failed: %w", err)
}
@@ -519,9 +519,9 @@ func (m Manager) saveInterface(ctx context.Context, iface *domain.Interface) (
return nil, fmt.Errorf("interface validation failed: %w", err)
}
stateChanged := m.hasInterfaceStateChanged(ctx, iface)
oldEnabled, newEnabled := m.getInterfaceStateHistory(ctx, iface)
if err := m.handleInterfacePreSaveHooks(stateChanged, iface); err != nil {
if err := m.handleInterfacePreSaveHooks(iface, oldEnabled, newEnabled); err != nil {
return nil, fmt.Errorf("pre-save hooks failed: %w", err)
}
@@ -561,7 +561,7 @@ func (m Manager) saveInterface(ctx context.Context, iface *domain.Interface) (
m.bus.Publish(app.TopicRouteUpdate, "interface updated: "+string(iface.Identifier))
}
if err := m.handleInterfacePostSaveHooks(stateChanged, iface); err != nil {
if err := m.handleInterfacePostSaveHooks(iface, oldEnabled, newEnabled); err != nil {
return nil, fmt.Errorf("post-save hooks failed: %w", err)
}
@@ -576,32 +576,13 @@ func (m Manager) saveInterface(ctx context.Context, iface *domain.Interface) (
return iface, nil
}
func (m Manager) hasInterfaceStateChanged(ctx context.Context, iface *domain.Interface) bool {
func (m Manager) getInterfaceStateHistory(ctx context.Context, iface *domain.Interface) (oldEnabled, newEnabled bool) {
oldInterface, err := m.db.GetInterface(ctx, iface.Identifier)
if err != nil {
return false
return false, !iface.IsDisabled() // if the interface did not exist, we assume it was not enabled
}
if oldInterface.IsDisabled() != iface.IsDisabled() {
return true // interface in db has changed
}
wgInterface, err := m.wg.GetController(*iface).GetInterface(ctx, iface.Identifier)
if err != nil {
return true // interface might not exist - so we assume that there must be a change
}
// compare physical interface settings
if len(wgInterface.Addresses) != len(iface.Addresses) ||
wgInterface.Mtu != iface.Mtu ||
wgInterface.FirewallMark != iface.FirewallMark ||
wgInterface.ListenPort != iface.ListenPort ||
wgInterface.PrivateKey != iface.PrivateKey ||
wgInterface.PublicKey != iface.PublicKey {
return true
}
return false
return !oldInterface.IsDisabled(), !iface.IsDisabled()
}
func (m Manager) handleInterfacePreSaveActions(iface *domain.Interface) error {
@@ -617,12 +598,14 @@ func (m Manager) handleInterfacePreSaveActions(iface *domain.Interface) error {
return nil
}
func (m Manager) handleInterfacePreSaveHooks(stateChanged bool, iface *domain.Interface) error {
if !stateChanged {
func (m Manager) handleInterfacePreSaveHooks(iface *domain.Interface, oldEnabled, newEnabled bool) error {
if oldEnabled == newEnabled {
return nil // do nothing if state did not change
}
if !iface.IsDisabled() {
slog.Debug("executing pre-save hooks", "interface", iface.Identifier, "up", newEnabled)
if newEnabled {
if err := m.quick.ExecuteInterfaceHook(iface.Identifier, iface.PreUp); err != nil {
return fmt.Errorf("failed to execute pre-up hook: %w", err)
}
@@ -634,12 +617,14 @@ func (m Manager) handleInterfacePreSaveHooks(stateChanged bool, iface *domain.In
return nil
}
func (m Manager) handleInterfacePostSaveHooks(stateChanged bool, iface *domain.Interface) error {
if !stateChanged {
func (m Manager) handleInterfacePostSaveHooks(iface *domain.Interface, oldEnabled, newEnabled bool) error {
if oldEnabled == newEnabled {
return nil // do nothing if state did not change
}
if !iface.IsDisabled() {
slog.Debug("executing post-save hooks", "interface", iface.Identifier, "up", newEnabled)
if newEnabled {
if err := m.quick.ExecuteInterfaceHook(iface.Identifier, iface.PostUp); err != nil {
return fmt.Errorf("failed to execute post-up hook: %w", err)
}

View File

@@ -188,6 +188,30 @@ func (m Manager) CreatePeer(ctx context.Context, peer *domain.Peer) (*domain.Pee
sessionUser := domain.GetUserInfo(ctx)
// Enforce peer limit for non-admin users if LimitAdditionalUserPeers is set
if m.cfg.Core.SelfProvisioningAllowed && !sessionUser.IsAdmin && m.cfg.Advanced.LimitAdditionalUserPeers > 0 {
peers, err := m.db.GetUserPeers(ctx, peer.UserIdentifier)
if err != nil {
return nil, fmt.Errorf("failed to fetch peers for user %s: %w", peer.UserIdentifier, err)
}
// Count enabled peers (disabled IS NULL)
peerCount := 0
for _, p := range peers {
if !p.IsDisabled() {
peerCount++
}
}
totalAllowedPeers := 1 + m.cfg.Advanced.LimitAdditionalUserPeers // 1 default peer + x additional peers
if peerCount >= totalAllowedPeers {
slog.WarnContext(ctx, "peer creation blocked due to limit",
"user", peer.UserIdentifier,
"current_count", peerCount,
"allowed_count", totalAllowedPeers)
return nil, fmt.Errorf("peer limit reached (%d peers allowed): %w", totalAllowedPeers, domain.ErrNoPermission)
}
}
existingPeer, err := m.db.GetPeer(ctx, peer.Identifier)
if err != nil && !errors.Is(err, domain.ErrNotFound) {
return nil, fmt.Errorf("unable to load existing peer %s: %w", peer.Identifier, err)