Mikrotik integration (#467)
Some checks failed
Docker / Build and Push (push) Has been cancelled
github-pages / deploy (push) Has been cancelled
Docker / release (push) Has been cancelled

Allow MikroTik routes as WireGuard backends
This commit is contained in:
h44z
2025-08-10 14:42:02 +02:00
committed by GitHub
parent a86f83a219
commit 112f6bfb77
40 changed files with 3150 additions and 205 deletions

View File

@@ -6,8 +6,6 @@ import (
"sync"
"time"
probing "github.com/prometheus-community/pro-bing"
"github.com/h44z/wg-portal/internal/app"
"github.com/h44z/wg-portal/internal/config"
"github.com/h44z/wg-portal/internal/domain"
@@ -30,11 +28,6 @@ type StatisticsDatabaseRepo interface {
DeletePeerStatus(ctx context.Context, id domain.PeerIdentifier) error
}
type StatisticsInterfaceController interface {
GetInterface(_ context.Context, id domain.InterfaceIdentifier) (*domain.PhysicalInterface, error)
GetPeers(_ context.Context, deviceId domain.InterfaceIdentifier) ([]domain.PhysicalPeer, error)
}
type StatisticsMetricsServer interface {
UpdateInterfaceMetrics(status domain.InterfaceStatus)
UpdatePeerMetrics(peer *domain.Peer, status domain.PeerStatus)
@@ -47,15 +40,20 @@ type StatisticsEventBus interface {
Publish(topic string, args ...any)
}
type pingJob struct {
Peer domain.Peer
Backend domain.InterfaceBackend
}
type StatisticsCollector struct {
cfg *config.Config
bus StatisticsEventBus
pingWaitGroup sync.WaitGroup
pingJobs chan domain.Peer
pingJobs chan pingJob
db StatisticsDatabaseRepo
wg StatisticsInterfaceController
wg *ControllerManager
ms StatisticsMetricsServer
peerChangeEvent chan domain.PeerIdentifier
@@ -66,7 +64,7 @@ func NewStatisticsCollector(
cfg *config.Config,
bus StatisticsEventBus,
db StatisticsDatabaseRepo,
wg StatisticsInterfaceController,
wg *ControllerManager,
ms StatisticsMetricsServer,
) (*StatisticsCollector, error) {
c := &StatisticsCollector{
@@ -117,7 +115,7 @@ func (c *StatisticsCollector) collectInterfaceData(ctx context.Context) {
}
for _, in := range interfaces {
physicalInterface, err := c.wg.GetInterface(ctx, in.Identifier)
physicalInterface, err := c.wg.GetController(in).GetInterface(ctx, in.Identifier)
if err != nil {
slog.Warn("failed to load physical interface for data collection", "interface", in.Identifier,
"error", err)
@@ -169,7 +167,7 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
}
for _, in := range interfaces {
peers, err := c.wg.GetPeers(ctx, in.Identifier)
peers, err := c.wg.GetController(in).GetPeers(ctx, in.Identifier)
if err != nil {
slog.Warn("failed to fetch peers for data collection", "interface", in.Identifier, "error", err)
continue
@@ -271,7 +269,7 @@ func (c *StatisticsCollector) startPingWorkers(ctx context.Context) {
c.pingWaitGroup = sync.WaitGroup{}
c.pingWaitGroup.Add(c.cfg.Statistics.PingCheckWorkers)
c.pingJobs = make(chan domain.Peer, c.cfg.Statistics.PingCheckWorkers)
c.pingJobs = make(chan pingJob, c.cfg.Statistics.PingCheckWorkers)
// start workers
for i := 0; i < c.cfg.Statistics.PingCheckWorkers; i++ {
@@ -314,7 +312,10 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) {
continue
}
for _, peer := range peers {
c.pingJobs <- peer
c.pingJobs <- pingJob{
Peer: peer,
Backend: in.Backend,
}
}
}
}
@@ -323,11 +324,14 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) {
func (c *StatisticsCollector) pingWorker(ctx context.Context) {
defer c.pingWaitGroup.Done()
for peer := range c.pingJobs {
for job := range c.pingJobs {
peer := job.Peer
backend := job.Backend
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
peerPingable := c.isPeerPingable(ctx, peer)
peerPingable := c.isPeerPingable(ctx, backend, peer)
slog.Debug("peer ping check completed", "peer", peer.Identifier, "pingable", peerPingable)
now := time.Now()
@@ -368,7 +372,11 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
}
}
func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Peer) bool {
func (c *StatisticsCollector) isPeerPingable(
ctx context.Context,
backend domain.InterfaceBackend,
peer domain.Peer,
) bool {
if !c.cfg.Statistics.UsePingChecks {
return false
}
@@ -378,23 +386,13 @@ func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Pe
return false
}
pinger, err := probing.NewPinger(checkAddr)
stats, err := c.wg.GetControllerByName(backend).PingAddresses(ctx, checkAddr)
if err != nil {
slog.Debug("failed to instantiate pinger", "peer", peer.Identifier, "address", checkAddr, "error", err)
slog.Debug("failed to ping peer", "peer", peer.Identifier, "error", err)
return false
}
checkCount := 1
pinger.SetPrivileged(!c.cfg.Statistics.PingUnprivileged)
pinger.Count = checkCount
pinger.Timeout = 2 * time.Second
err = pinger.RunWithContext(ctx) // Blocks until finished.
if err != nil {
slog.Debug("pinger for peer exited unexpectedly", "peer", peer.Identifier, "address", checkAddr, "error", err)
return false
}
stats := pinger.Statistics()
return stats.PacketsRecv == checkCount
return stats.IsPingable()
}
func (c *StatisticsCollector) updateInterfaceMetrics(status domain.InterfaceStatus) {