diff --git a/internal/adapters/wgcontroller/local.go b/internal/adapters/wgcontroller/local.go index b359ae6..9e73d21 100644 --- a/internal/adapters/wgcontroller/local.go +++ b/internal/adapters/wgcontroller/local.go @@ -10,7 +10,9 @@ import ( "os" "os/exec" "strings" + "time" + probing "github.com/prometheus-community/pro-bing" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl" @@ -812,3 +814,34 @@ func (c LocalController) DeleteRouteRules(_ context.Context, rules []domain.Rout } // endregion routing-related + +// region statistics-related + +func (c LocalController) PingAddresses( + ctx context.Context, + addr string, +) (*domain.PingerResult, error) { + pinger, err := probing.NewPinger(addr) + if err != nil { + return nil, fmt.Errorf("failed to instantiate pinger for %s: %w", addr, err) + } + + checkCount := 1 + pinger.SetPrivileged(!c.cfg.Statistics.PingUnprivileged) + pinger.Count = checkCount + pinger.Timeout = 2 * time.Second + err = pinger.RunWithContext(ctx) // Blocks until finished. + if err != nil { + return nil, fmt.Errorf("failed to ping %s: %w", addr, err) + } + + stats := pinger.Statistics() + + return &domain.PingerResult{ + PacketsRecv: stats.PacketsRecv, + PacketsSent: stats.PacketsSent, + Rtts: stats.Rtts, + }, nil +} + +// endregion statistics-related diff --git a/internal/adapters/wgcontroller/mikrotik.go b/internal/adapters/wgcontroller/mikrotik.go index 965053f..2e15e78 100644 --- a/internal/adapters/wgcontroller/mikrotik.go +++ b/internal/adapters/wgcontroller/mikrotik.go @@ -336,3 +336,40 @@ func (c MikrotikController) DeleteRouteRules(_ context.Context, rules []domain.R } // endregion routing-related + +// region statistics-related + +func (c MikrotikController) PingAddresses( + ctx context.Context, + addr string, +) (*domain.PingerResult, error) { + wgReply := c.client.ExecList(ctx, "/tool/ping", + // limit to 1 packet with a max running time of 2 seconds + lowlevel.GenericJsonObject{"address": addr, "count": 1, "interval": "00:00:02"}, + ) + + if wgReply.Status != lowlevel.MikrotikApiStatusOk { + return nil, fmt.Errorf("failed to ping %s: %v", addr, wgReply.Error) + } + + var result domain.PingerResult + for _, item := range wgReply.Data { + result.PacketsRecv += item.GetInt("received") + result.PacketsSent += item.GetInt("sent") + + rttStr := item.GetString("avg-rtt") + if rttStr != "" { + rtt, err := time.ParseDuration(rttStr) + if err == nil { + result.Rtts = append(result.Rtts, rtt) + } else { + // use a high value to indicate failure or timeout + result.Rtts = append(result.Rtts, 999999*time.Millisecond) + } + } + } + + return &result, nil +} + +// endregion statistics-related diff --git a/internal/app/wireguard/controller_manager.go b/internal/app/wireguard/controller_manager.go index 0211d86..ab1eaa9 100644 --- a/internal/app/wireguard/controller_manager.go +++ b/internal/app/wireguard/controller_manager.go @@ -30,6 +30,10 @@ type InterfaceController interface { updateFunc func(pp *domain.PhysicalPeer) (*domain.PhysicalPeer, error), ) error DeletePeer(_ context.Context, deviceId domain.InterfaceIdentifier, id domain.PeerIdentifier) error + PingAddresses( + ctx context.Context, + addr string, + ) (*domain.PingerResult, error) } type backendInstance struct { diff --git a/internal/app/wireguard/statistics.go b/internal/app/wireguard/statistics.go index 76455d6..29e8808 100644 --- a/internal/app/wireguard/statistics.go +++ b/internal/app/wireguard/statistics.go @@ -6,8 +6,6 @@ import ( "sync" "time" - probing "github.com/prometheus-community/pro-bing" - "github.com/h44z/wg-portal/internal/app" "github.com/h44z/wg-portal/internal/config" "github.com/h44z/wg-portal/internal/domain" @@ -30,11 +28,6 @@ type StatisticsDatabaseRepo interface { DeletePeerStatus(ctx context.Context, id domain.PeerIdentifier) error } -type StatisticsInterfaceController interface { - GetInterface(_ context.Context, id domain.InterfaceIdentifier) (*domain.PhysicalInterface, error) - GetPeers(_ context.Context, deviceId domain.InterfaceIdentifier) ([]domain.PhysicalPeer, error) -} - type StatisticsMetricsServer interface { UpdateInterfaceMetrics(status domain.InterfaceStatus) UpdatePeerMetrics(peer *domain.Peer, status domain.PeerStatus) @@ -45,12 +38,17 @@ type StatisticsEventBus interface { Subscribe(topic string, fn interface{}) error } +type pingJob struct { + Peer domain.Peer + Backend domain.InterfaceBackend +} + type StatisticsCollector struct { cfg *config.Config bus StatisticsEventBus pingWaitGroup sync.WaitGroup - pingJobs chan domain.Peer + pingJobs chan pingJob db StatisticsDatabaseRepo wg *ControllerManager @@ -245,7 +243,7 @@ func (c *StatisticsCollector) startPingWorkers(ctx context.Context) { c.pingWaitGroup = sync.WaitGroup{} c.pingWaitGroup.Add(c.cfg.Statistics.PingCheckWorkers) - c.pingJobs = make(chan domain.Peer, c.cfg.Statistics.PingCheckWorkers) + c.pingJobs = make(chan pingJob, c.cfg.Statistics.PingCheckWorkers) // start workers for i := 0; i < c.cfg.Statistics.PingCheckWorkers; i++ { @@ -288,7 +286,10 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) { continue } for _, peer := range peers { - c.pingJobs <- peer + c.pingJobs <- pingJob{ + Peer: peer, + Backend: in.Backend, + } } } } @@ -297,8 +298,10 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) { func (c *StatisticsCollector) pingWorker(ctx context.Context) { defer c.pingWaitGroup.Done() - for peer := range c.pingJobs { - peerPingable := c.isPeerPingable(ctx, peer) + for job := range c.pingJobs { + peer := job.Peer + backend := job.Backend + peerPingable := c.isPeerPingable(ctx, backend, peer) slog.Debug("peer ping check completed", "peer", peer.Identifier, "pingable", peerPingable) now := time.Now() @@ -325,7 +328,11 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) { } } -func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Peer) bool { +func (c *StatisticsCollector) isPeerPingable( + ctx context.Context, + backend domain.InterfaceBackend, + peer domain.Peer, +) bool { if !c.cfg.Statistics.UsePingChecks { return false } @@ -335,25 +342,13 @@ func (c *StatisticsCollector) isPeerPingable(ctx context.Context, peer domain.Pe return false } - // TODO: implement ping check on Mikrotik (or any other controller) - - pinger, err := probing.NewPinger(checkAddr) + stats, err := c.wg.GetControllerByName(backend).PingAddresses(ctx, checkAddr) if err != nil { - slog.Debug("failed to instantiate pinger", "peer", peer.Identifier, "address", checkAddr, "error", err) + slog.Debug("failed to ping peer", "peer", peer.Identifier, "error", err) return false } - checkCount := 1 - pinger.SetPrivileged(!c.cfg.Statistics.PingUnprivileged) - pinger.Count = checkCount - pinger.Timeout = 2 * time.Second - err = pinger.RunWithContext(ctx) // Blocks until finished. - if err != nil { - slog.Debug("pinger for peer exited unexpectedly", "peer", peer.Identifier, "address", checkAddr, "error", err) - return false - } - stats := pinger.Statistics() - return stats.PacketsRecv == checkCount + return stats.IsPingable() } func (c *StatisticsCollector) updateInterfaceMetrics(status domain.InterfaceStatus) { diff --git a/internal/domain/statistics.go b/internal/domain/statistics.go index 9d04a72..85cd914 100644 --- a/internal/domain/statistics.go +++ b/internal/domain/statistics.go @@ -1,6 +1,8 @@ package domain -import "time" +import ( + "time" +) type PeerStatus struct { PeerId PeerIdentifier `gorm:"primaryKey;column:identifier"` @@ -35,3 +37,25 @@ type InterfaceStatus struct { BytesReceived uint64 `gorm:"column:received"` BytesTransmitted uint64 `gorm:"column:transmitted"` } + +type PingerResult struct { + PacketsRecv int + PacketsSent int + Rtts []time.Duration +} + +func (r PingerResult) IsPingable() bool { + return r.PacketsRecv > 0 && r.PacketsSent > 0 && len(r.Rtts) > 0 +} + +func (r PingerResult) AverageRtt() time.Duration { + if len(r.Rtts) == 0 { + return 0 + } + + var total time.Duration + for _, rtt := range r.Rtts { + total += rtt + } + return total / time.Duration(len(r.Rtts)) +} diff --git a/internal/lowlevel/mikrotik.go b/internal/lowlevel/mikrotik.go index d7ca67a..f0bdaf8 100644 --- a/internal/lowlevel/mikrotik.go +++ b/internal/lowlevel/mikrotik.go @@ -1,6 +1,7 @@ package lowlevel import ( + "bytes" "context" "crypto/tls" "encoding/json" @@ -198,7 +199,7 @@ func (m *MikrotikApiClient) getFullPath(command string) string { } func (m *MikrotikApiClient) prepareGetRequest(ctx context.Context, fullUrl string) (*http.Request, error) { - req, err := http.NewRequestWithContext(ctx, "GET", fullUrl, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fullUrl, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } @@ -210,6 +211,30 @@ func (m *MikrotikApiClient) prepareGetRequest(ctx context.Context, fullUrl strin return req, nil } +func (m *MikrotikApiClient) preparePostRequest( + ctx context.Context, + fullUrl string, + payload GenericJsonObject, +) (*http.Request, error) { + // marshal the payload to JSON + payloadBytes, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullUrl, bytes.NewReader(payloadBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + if m.cfg.ApiUser != "" && m.cfg.ApiPassword != "" { + req.SetBasicAuth(m.cfg.ApiUser, m.cfg.ApiPassword) + } + + return req, nil +} + func errToApiResponse[T any](code int, message string, err error) MikrotikApiResponse[T] { return MikrotikApiResponse[T]{ Status: MikrotikApiStatusError, @@ -296,4 +321,27 @@ func (m *MikrotikApiClient) Get( return response } +func (m *MikrotikApiClient) ExecList( + ctx context.Context, + command string, + payload GenericJsonObject, +) MikrotikApiResponse[[]GenericJsonObject] { + apiCtx, cancel := context.WithTimeout(ctx, m.cfg.ApiTimeout) + defer cancel() + + fullUrl := m.getFullPath(command) + + req, err := m.preparePostRequest(apiCtx, fullUrl, payload) + if err != nil { + return errToApiResponse[[]GenericJsonObject](MikrotikApiErrorCodeRequestPreparationFailed, + "failed to create request", err) + } + + start := time.Now() + m.debugLog("executing API get", "url", fullUrl) + response := parseHttpResponse[[]GenericJsonObject](m.client.Do(req)) + m.debugLog("retrieved API get result", "url", fullUrl, "duration", time.Since(start).String()) + return response +} + // endregion API-client