feat: add live traffic stats (#530) (#616)

This commit is contained in:
h44z
2026-01-26 22:24:10 +01:00
committed by GitHub
parent e53b8c8087
commit 70cc44cc4d
13 changed files with 289 additions and 5 deletions

View File

@@ -1,6 +1,8 @@
package logging
import (
"bufio"
"net"
"net/http"
)
@@ -38,6 +40,12 @@ func (w *writerWrapper) Write(data []byte) (int, error) {
return n, err
}
// Hijack wraps the Hijack method of the ResponseWriter and returns the hijacked connection.
// This is required for websockets to work.
func (w *writerWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return http.NewResponseController(w.ResponseWriter).Hijack()
}
// newWriterWrapper returns a new writerWrapper that wraps the given http.ResponseWriter.
// It initializes the StatusCode to http.StatusOK.
func newWriterWrapper(w http.ResponseWriter) *writerWrapper {

View File

@@ -0,0 +1,100 @@
package handlers
import (
"context"
"net/http"
"strings"
"sync"
"github.com/go-pkgz/routegroup"
"github.com/gorilla/websocket"
"github.com/h44z/wg-portal/internal/app"
"github.com/h44z/wg-portal/internal/config"
"github.com/h44z/wg-portal/internal/domain"
)
type WebsocketEventBus interface {
Subscribe(topic string, fn any) error
Unsubscribe(topic string, fn any) error
}
type WebsocketEndpoint struct {
authenticator Authenticator
bus WebsocketEventBus
upgrader websocket.Upgrader
}
func NewWebsocketEndpoint(cfg *config.Config, auth Authenticator, bus WebsocketEventBus) *WebsocketEndpoint {
return &WebsocketEndpoint{
authenticator: auth,
bus: bus,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
return strings.HasPrefix(origin, cfg.Web.ExternalUrl)
},
},
}
}
func (e WebsocketEndpoint) GetName() string {
return "WebsocketEndpoint"
}
func (e WebsocketEndpoint) RegisterRoutes(g *routegroup.Bundle) {
g.With(e.authenticator.LoggedIn()).HandleFunc("GET /ws", e.handleWebsocket())
}
// wsMessage represents a message sent over websocket to the frontend
type wsMessage struct {
Type string `json:"type"` // either "peer_stats" or "interface_stats"
Data any `json:"data"` // domain.TrafficDelta
}
func (e WebsocketEndpoint) handleWebsocket() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := e.upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
writeMutex := sync.Mutex{}
writeJSON := func(msg wsMessage) error {
writeMutex.Lock()
defer writeMutex.Unlock()
return conn.WriteJSON(msg)
}
peerStatsHandler := func(status domain.TrafficDelta) {
_ = writeJSON(wsMessage{Type: "peer_stats", Data: status})
}
interfaceStatsHandler := func(status domain.TrafficDelta) {
_ = writeJSON(wsMessage{Type: "interface_stats", Data: status})
}
_ = e.bus.Subscribe(app.TopicPeerStatsUpdated, peerStatsHandler)
defer e.bus.Unsubscribe(app.TopicPeerStatsUpdated, peerStatsHandler)
_ = e.bus.Subscribe(app.TopicInterfaceStatsUpdated, interfaceStatsHandler)
defer e.bus.Unsubscribe(app.TopicInterfaceStatsUpdated, interfaceStatsHandler)
// Keep connection open until client disconnects or context is cancelled
go func() {
for {
if _, _, err := conn.ReadMessage(); err != nil {
cancel()
return
}
}
}()
<-ctx.Done()
}
}

View File

@@ -26,6 +26,7 @@ const TopicUserEnabled = "user:enabled"
const TopicInterfaceCreated = "interface:created"
const TopicInterfaceUpdated = "interface:updated"
const TopicInterfaceDeleted = "interface:deleted"
const TopicInterfaceStatsUpdated = "interface:stats:updated"
// endregion interface-events
@@ -37,6 +38,7 @@ const TopicPeerUpdated = "peer:updated"
const TopicPeerInterfaceUpdated = "peer:interface:updated"
const TopicPeerIdentifierUpdated = "peer:identifier:updated"
const TopicPeerStateChanged = "peer:state:changed"
const TopicPeerStatsUpdated = "peer:stats:updated"
// endregion peer-events

View File

@@ -121,15 +121,25 @@ func (c *StatisticsCollector) collectInterfaceData(ctx context.Context) {
"error", err)
continue
}
now := time.Now()
err = c.db.UpdateInterfaceStatus(ctx, in.Identifier,
func(i *domain.InterfaceStatus) (*domain.InterfaceStatus, error) {
i.UpdatedAt = time.Now()
td := domain.CalculateTrafficDelta(
string(in.Identifier),
i.UpdatedAt, now,
i.BytesTransmitted, physicalInterface.BytesUpload,
i.BytesReceived, physicalInterface.BytesDownload,
)
i.UpdatedAt = now
i.BytesReceived = physicalInterface.BytesDownload
i.BytesTransmitted = physicalInterface.BytesUpload
// Update prometheus metrics
go c.updateInterfaceMetrics(*i)
// Publish stats update event
c.bus.Publish(app.TopicInterfaceStatsUpdated, td)
return i, nil
})
if err != nil {
@@ -172,6 +182,7 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
slog.Warn("failed to fetch peers for data collection", "interface", in.Identifier, "error", err)
continue
}
now := time.Now()
for _, peer := range peers {
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
@@ -184,8 +195,15 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
lastHandshake = &peer.LastHandshake
}
td := domain.CalculateTrafficDelta(
string(peer.Identifier),
p.UpdatedAt, now,
p.BytesTransmitted, peer.BytesDownload,
p.BytesReceived, peer.BytesUpload,
)
// calculate if session was restarted
p.UpdatedAt = time.Now()
p.UpdatedAt = now
p.LastSessionStart = getSessionStartTime(*p, peer.BytesUpload, peer.BytesDownload,
lastHandshake)
p.BytesReceived = peer.BytesUpload // store bytes that where uploaded from the peer and received by the server
@@ -195,7 +213,8 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
p.CalcConnected()
if wasConnected != p.IsConnected {
slog.Debug("peer connection state changed", "peer", peer.Identifier, "connected", p.IsConnected)
slog.Debug("peer connection state changed",
"peer", peer.Identifier, "connected", p.IsConnected)
connectionStateChanged = true
newPeerStatus = *p // store new status for event publishing
}
@@ -203,6 +222,9 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
// Update prometheus metrics
go c.updatePeerMetrics(ctx, *p)
// Publish stats update event
c.bus.Publish(app.TopicPeerStatsUpdated, td)
return p, nil
})
if err != nil {