wip: ping handler per backend (#426)

This commit is contained in:
Christoph Haas
2025-06-01 09:50:46 +02:00
parent ea6da4114f
commit c612b5bbb1
6 changed files with 171 additions and 30 deletions

View File

@@ -30,6 +30,10 @@ type InterfaceController interface {
updateFunc func(pp *domain.PhysicalPeer) (*domain.PhysicalPeer, error),
) error
DeletePeer(_ context.Context, deviceId domain.InterfaceIdentifier, id domain.PeerIdentifier) error
PingAddresses(
ctx context.Context,
addr string,
) (*domain.PingerResult, error)
}
type backendInstance struct {

View File

@@ -6,8 +6,6 @@ import (
"sync"
"time"
probing "github.com/prometheus-community/pro-bing"
"github.com/h44z/wg-portal/internal/app"
"github.com/h44z/wg-portal/internal/config"
"github.com/h44z/wg-portal/internal/domain"
@@ -30,11 +28,6 @@ type StatisticsDatabaseRepo interface {
DeletePeerStatus(ctx context.Context, id domain.PeerIdentifier) error
}
type StatisticsInterfaceController interface {
GetInterface(_ context.Context, id domain.InterfaceIdentifier) (*domain.PhysicalInterface, error)
GetPeers(_ context.Context, deviceId domain.InterfaceIdentifier) ([]domain.PhysicalPeer, error)
}
type StatisticsMetricsServer interface {
UpdateInterfaceMetrics(status domain.InterfaceStatus)
UpdatePeerMetrics(peer *domain.Peer, status domain.PeerStatus)
@@ -45,12 +38,17 @@ type StatisticsEventBus interface {
Subscribe(topic string, fn interface{}) error
}
type pingJob struct {
Peer domain.Peer
Backend domain.InterfaceBackend
}
type StatisticsCollector struct {
cfg *config.Config
bus StatisticsEventBus
pingWaitGroup sync.WaitGroup
pingJobs chan domain.Peer
pingJobs chan pingJob
db StatisticsDatabaseRepo
wg *ControllerManager
@@ -245,7 +243,7 @@ func (c *StatisticsCollector) startPingWorkers(ctx context.Context) {
c.pingWaitGroup = sync.WaitGroup{}
c.pingWaitGroup.Add(c.cfg.Statistics.PingCheckWorkers)
c.pingJobs = make(chan domain.Peer, c.cfg.Statistics.PingCheckWorkers)
c.pingJobs = make(chan pingJob, c.cfg.Statistics.PingCheckWorkers)
// start workers
for i := 0; i < c.cfg.Statistics.PingCheckWorkers; i++ {
@@ -288,7 +286,10 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) {
continue
}
for _, peer := range peers {
c.pingJobs <- peer
c.pingJobs <- pingJob{
Peer: peer,
Backend: in.Backend,
}
}
}
}
@@ -297,8 +298,10 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) {
func (c *StatisticsCollector) pingWorker(ctx context.Context) {
defer c.pingWaitGroup.Done()
for peer := range c.pingJobs {
peerPingable := c.isPeerPingable(ctx, peer)
for job := range c.pingJobs {
peer := job.Peer
backend := job.Backend
peerPingable := c.isPeerPingable(ctx, backend, peer)
slog.Debug("peer ping check completed", "peer", peer.Identifier, "pingable", peerPingable)
now := time.Now()
@@ -325,7 +328,11 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
}
}
func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Peer) bool {
func (c *StatisticsCollector) isPeerPingable(
ctx context.Context,
backend domain.InterfaceBackend,
peer domain.Peer,
) bool {
if !c.cfg.Statistics.UsePingChecks {
return false
}
@@ -335,25 +342,13 @@ func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Pe
return false
}
// TODO: implement ping check on Mikrotik (or any other controller)
pinger, err := probing.NewPinger(checkAddr)
stats, err := c.wg.GetControllerByName(backend).PingAddresses(ctx, checkAddr)
if err != nil {
slog.Debug("failed to instantiate pinger", "peer", peer.Identifier, "address", checkAddr, "error", err)
slog.Debug("failed to ping peer", "peer", peer.Identifier, "error", err)
return false
}
checkCount := 1
pinger.SetPrivileged(!c.cfg.Statistics.PingUnprivileged)
pinger.Count = checkCount
pinger.Timeout = 2 * time.Second
err = pinger.RunWithContext(ctx) // Blocks until finished.
if err != nil {
slog.Debug("pinger for peer exited unexpectedly", "peer", peer.Identifier, "address", checkAddr, "error", err)
return false
}
stats := pinger.Statistics()
return stats.PacketsRecv == checkCount
return stats.IsPingable()
}
func (c *StatisticsCollector) updateInterfaceMetrics(status domain.InterfaceStatus) {