2023-08-04 13:34:18 +02:00
package wireguard
import (
"context"
2025-03-02 08:51:13 +01:00
"log/slog"
2024-09-29 22:10:50 +02:00
"sync"
"time"
2025-02-28 08:29:40 +01:00
"github.com/h44z/wg-portal/internal/app"
"github.com/h44z/wg-portal/internal/config"
"github.com/h44z/wg-portal/internal/domain"
2023-08-04 13:34:18 +02:00
)
2025-03-23 23:09:47 +01:00
type StatisticsDatabaseRepo interface {
GetAllInterfaces ( ctx context . Context ) ( [ ] domain . Interface , error )
GetInterfacePeers ( ctx context . Context , id domain . InterfaceIdentifier ) ( [ ] domain . Peer , error )
GetPeer ( ctx context . Context , id domain . PeerIdentifier ) ( * domain . Peer , error )
UpdatePeerStatus (
ctx context . Context ,
id domain . PeerIdentifier ,
updateFunc func ( in * domain . PeerStatus ) ( * domain . PeerStatus , error ) ,
) error
UpdateInterfaceStatus (
ctx context . Context ,
id domain . InterfaceIdentifier ,
updateFunc func ( in * domain . InterfaceStatus ) ( * domain . InterfaceStatus , error ) ,
) error
DeletePeerStatus ( ctx context . Context , id domain . PeerIdentifier ) error
}
type StatisticsMetricsServer interface {
UpdateInterfaceMetrics ( status domain . InterfaceStatus )
UpdatePeerMetrics ( peer * domain . Peer , status domain . PeerStatus )
}
type StatisticsEventBus interface {
// Subscribe subscribes to a topic
Subscribe ( topic string , fn interface { } ) error
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
// Publish sends a message to the message bus.
Publish ( topic string , args ... any )
2025-03-23 23:09:47 +01:00
}
2025-08-10 14:42:02 +02:00
type pingJob struct {
Peer domain . Peer
Backend domain . InterfaceBackend
}
2023-08-04 13:34:18 +02:00
type StatisticsCollector struct {
cfg * config . Config
2025-03-23 23:09:47 +01:00
bus StatisticsEventBus
2023-08-04 13:34:18 +02:00
pingWaitGroup sync . WaitGroup
2025-08-10 14:42:02 +02:00
pingJobs chan pingJob
2023-08-04 13:34:18 +02:00
db StatisticsDatabaseRepo
2025-08-10 14:42:02 +02:00
wg * ControllerManager
2025-03-23 23:09:47 +01:00
ms StatisticsMetricsServer
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
peerChangeEvent chan domain . PeerIdentifier
2023-08-04 13:34:18 +02:00
}
2025-03-23 23:09:47 +01:00
// NewStatisticsCollector creates a new statistics collector.
2025-01-05 11:30:34 +01:00
func NewStatisticsCollector (
cfg * config . Config ,
2025-03-23 23:09:47 +01:00
bus StatisticsEventBus ,
2025-01-05 11:30:34 +01:00
db StatisticsDatabaseRepo ,
2025-08-10 14:42:02 +02:00
wg * ControllerManager ,
2025-03-23 23:09:47 +01:00
ms StatisticsMetricsServer ,
2025-01-05 11:30:34 +01:00
) ( * StatisticsCollector , error ) {
c := & StatisticsCollector {
2023-08-04 13:34:18 +02:00
cfg : cfg ,
2025-01-05 11:30:34 +01:00
bus : bus ,
2023-08-04 13:34:18 +02:00
db : db ,
wg : wg ,
2024-09-29 22:10:50 +02:00
ms : ms ,
2025-01-05 11:30:34 +01:00
}
c . connectToMessageBus ( )
return c , nil
2023-08-04 13:34:18 +02:00
}
2025-03-23 23:09:47 +01:00
// StartBackgroundJobs starts the background jobs for the statistics collector.
// This method is non-blocking and returns immediately.
2023-08-04 13:34:18 +02:00
func ( c * StatisticsCollector ) StartBackgroundJobs ( ctx context . Context ) {
c . startPingWorkers ( ctx )
c . startInterfaceDataFetcher ( ctx )
c . startPeerDataFetcher ( ctx )
}
func ( c * StatisticsCollector ) startInterfaceDataFetcher ( ctx context . Context ) {
if ! c . cfg . Statistics . CollectInterfaceData {
return
}
go c . collectInterfaceData ( ctx )
2025-03-02 08:51:13 +01:00
slog . Debug ( "started interface data fetcher" )
2023-08-04 13:34:18 +02:00
}
func ( c * StatisticsCollector ) collectInterfaceData ( ctx context . Context ) {
// Start ticker
ticker := time . NewTicker ( c . cfg . Statistics . DataCollectionInterval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return // program stopped
case <- ticker . C :
interfaces , err := c . db . GetAllInterfaces ( ctx )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch all interfaces for data collection" , "error" , err )
2023-08-04 13:34:18 +02:00
continue
}
for _ , in := range interfaces {
2025-08-10 14:42:02 +02:00
physicalInterface , err := c . wg . GetController ( in ) . GetInterface ( ctx , in . Identifier )
2023-08-04 13:34:18 +02:00
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to load physical interface for data collection" , "interface" , in . Identifier ,
"error" , err )
2023-08-04 13:34:18 +02:00
continue
}
2026-01-26 22:24:10 +01:00
now := time . Now ( )
2025-01-05 11:30:34 +01:00
err = c . db . UpdateInterfaceStatus ( ctx , in . Identifier ,
func ( i * domain . InterfaceStatus ) ( * domain . InterfaceStatus , error ) {
2026-01-26 22:24:10 +01:00
td := domain . CalculateTrafficDelta (
string ( in . Identifier ) ,
i . UpdatedAt , now ,
i . BytesTransmitted , physicalInterface . BytesUpload ,
i . BytesReceived , physicalInterface . BytesDownload ,
)
i . UpdatedAt = now
2025-01-05 11:30:34 +01:00
i . BytesReceived = physicalInterface . BytesDownload
i . BytesTransmitted = physicalInterface . BytesUpload
2024-09-29 22:10:50 +02:00
2025-01-05 11:30:34 +01:00
// Update prometheus metrics
go c . updateInterfaceMetrics ( * i )
2024-10-09 22:33:50 +02:00
2026-01-26 22:24:10 +01:00
// Publish stats update event
c . bus . Publish ( app . TopicInterfaceStatsUpdated , td )
2025-01-05 11:30:34 +01:00
return i , nil
} )
2023-08-04 13:34:18 +02:00
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to update interface status" , "interface" , in . Identifier , "error" , err )
2023-08-04 13:34:18 +02:00
}
2025-03-02 08:51:13 +01:00
slog . Debug ( "updated interface status" , "interface" , in . Identifier )
2023-08-04 13:34:18 +02:00
}
}
}
}
func ( c * StatisticsCollector ) startPeerDataFetcher ( ctx context . Context ) {
if ! c . cfg . Statistics . CollectPeerData {
return
}
go c . collectPeerData ( ctx )
2025-03-02 08:51:13 +01:00
slog . Debug ( "started peer data fetcher" )
2023-08-04 13:34:18 +02:00
}
func ( c * StatisticsCollector ) collectPeerData ( ctx context . Context ) {
// Start ticker
ticker := time . NewTicker ( c . cfg . Statistics . DataCollectionInterval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return // program stopped
case <- ticker . C :
interfaces , err := c . db . GetAllInterfaces ( ctx )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch all interfaces for peer data collection" , "error" , err )
2023-08-04 13:34:18 +02:00
continue
}
for _ , in := range interfaces {
2025-08-10 14:42:02 +02:00
peers , err := c . wg . GetController ( in ) . GetPeers ( ctx , in . Identifier )
2023-08-04 13:34:18 +02:00
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch peers for data collection" , "interface" , in . Identifier , "error" , err )
2023-08-04 13:34:18 +02:00
continue
}
2026-01-26 22:24:10 +01:00
now := time . Now ( )
2023-08-04 13:34:18 +02:00
for _ , peer := range peers {
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
var connectionStateChanged bool
var newPeerStatus domain . PeerStatus
2025-01-05 11:30:34 +01:00
err = c . db . UpdatePeerStatus ( ctx , peer . Identifier ,
func ( p * domain . PeerStatus ) ( * domain . PeerStatus , error ) {
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
wasConnected := p . IsConnected
2025-01-05 11:30:34 +01:00
var lastHandshake * time . Time
if ! peer . LastHandshake . IsZero ( ) {
lastHandshake = & peer . LastHandshake
}
2026-01-26 22:24:10 +01:00
td := domain . CalculateTrafficDelta (
string ( peer . Identifier ) ,
p . UpdatedAt , now ,
p . BytesTransmitted , peer . BytesDownload ,
p . BytesReceived , peer . BytesUpload ,
)
2025-01-05 11:30:34 +01:00
// calculate if session was restarted
2026-01-26 22:24:10 +01:00
p . UpdatedAt = now
2026-03-19 23:11:40 +01:00
p . LastSessionStart = c . getSessionStartTime ( * p , peer . BytesUpload , peer . BytesDownload ,
2025-01-05 11:30:34 +01:00
lastHandshake )
p . BytesReceived = peer . BytesUpload // store bytes that where uploaded from the peer and received by the server
p . BytesTransmitted = peer . BytesDownload // store bytes that where received from the peer and sent by the server
p . Endpoint = peer . Endpoint
p . LastHandshake = lastHandshake
2026-03-19 23:11:40 +01:00
p . CalcConnected ( c . cfg . Backend . ReKeyTimeoutInterval )
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
if wasConnected != p . IsConnected {
2026-01-26 22:24:10 +01:00
slog . Debug ( "peer connection state changed" ,
"peer" , peer . Identifier , "connected" , p . IsConnected )
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
connectionStateChanged = true
newPeerStatus = * p // store new status for event publishing
}
2025-01-05 11:30:34 +01:00
// Update prometheus metrics
go c . updatePeerMetrics ( ctx , * p )
2026-01-26 22:24:10 +01:00
// Publish stats update event
c . bus . Publish ( app . TopicPeerStatsUpdated , td )
2025-01-05 11:30:34 +01:00
return p , nil
} )
2023-08-04 13:34:18 +02:00
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to update peer status" , "peer" , peer . Identifier , "error" , err )
2025-01-05 11:30:34 +01:00
} else {
2025-03-02 08:51:13 +01:00
slog . Debug ( "updated peer status" , "peer" , peer . Identifier )
2023-08-04 13:34:18 +02:00
}
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
if connectionStateChanged {
2025-06-29 19:49:01 +02:00
peerModel , err := c . db . GetPeer ( ctx , peer . Identifier )
if err != nil {
slog . Error ( "failed to fetch peer for data collection" , "peer" , peer . Identifier , "error" ,
err )
continue
}
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
// publish event if connection state changed
2025-06-29 19:49:01 +02:00
c . bus . Publish ( app . TopicPeerStateChanged , newPeerStatus , * peerModel )
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
}
2023-08-04 13:34:18 +02:00
}
}
}
}
}
2026-03-19 23:11:40 +01:00
func ( c * StatisticsCollector ) getSessionStartTime (
2025-01-05 11:30:34 +01:00
oldStats domain . PeerStatus ,
newReceived , newTransmitted uint64 ,
latestHandshake * time . Time ,
) * time . Time {
2023-08-04 13:34:18 +02:00
if latestHandshake == nil {
return nil // currently not connected
}
2026-03-19 23:11:40 +01:00
oldestHandshakeTime := time . Now ( ) . Add ( - 1 * c . cfg . Backend . ReKeyTimeoutInterval ) // if a handshake is older than the rekey interval + grace-period, the peer is no longer connected
2023-08-04 13:34:18 +02:00
switch {
// old session was never initiated
case oldStats . BytesReceived == 0 && oldStats . BytesTransmitted == 0 && ( newReceived > 0 || newTransmitted > 0 ) :
return latestHandshake
// session never received bytes -> first receive
case oldStats . BytesReceived == 0 && newReceived > 0 && ( oldStats . LastHandshake == nil || oldStats . LastHandshake . Before ( oldestHandshakeTime ) ) :
return latestHandshake
// session never transmitted bytes -> first transmit
case oldStats . BytesTransmitted == 0 && newTransmitted > 0 && ( oldStats . LastSessionStart == nil || oldStats . LastHandshake . Before ( oldestHandshakeTime ) ) :
return latestHandshake
// session restarted as newer send or transmit counts are lower
case ( newReceived != 0 && newReceived < oldStats . BytesReceived ) || ( newTransmitted != 0 && newTransmitted < oldStats . BytesTransmitted ) :
return latestHandshake
// session initiated (but some bytes were already transmitted
case oldStats . LastSessionStart == nil && ( newReceived > oldStats . BytesReceived || newTransmitted > oldStats . BytesTransmitted ) :
return latestHandshake
default :
return oldStats . LastSessionStart
}
}
func ( c * StatisticsCollector ) startPingWorkers ( ctx context . Context ) {
if ! c . cfg . Statistics . UsePingChecks {
return
}
if c . pingJobs != nil {
return // already started
}
c . pingWaitGroup = sync . WaitGroup { }
c . pingWaitGroup . Add ( c . cfg . Statistics . PingCheckWorkers )
2025-08-10 14:42:02 +02:00
c . pingJobs = make ( chan pingJob , c . cfg . Statistics . PingCheckWorkers )
2023-08-04 13:34:18 +02:00
// start workers
for i := 0 ; i < c . cfg . Statistics . PingCheckWorkers ; i ++ {
go c . pingWorker ( ctx )
}
// start cleanup goroutine
go func ( ) {
c . pingWaitGroup . Wait ( )
2025-03-02 08:51:13 +01:00
slog . Debug ( "stopped ping checks" )
2023-08-04 13:34:18 +02:00
} ( )
go c . enqueuePingChecks ( ctx )
2025-03-02 08:51:13 +01:00
slog . Debug ( "started ping checks" )
2023-08-04 13:34:18 +02:00
}
func ( c * StatisticsCollector ) enqueuePingChecks ( ctx context . Context ) {
// Start ticker
ticker := time . NewTicker ( c . cfg . Statistics . PingCheckInterval )
defer ticker . Stop ( )
defer close ( c . pingJobs )
for {
select {
case <- ctx . Done ( ) :
return // program stopped
case <- ticker . C :
interfaces , err := c . db . GetAllInterfaces ( ctx )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch all interfaces for ping checks" , "error" , err )
2023-08-04 13:34:18 +02:00
continue
}
for _ , in := range interfaces {
peers , err := c . db . GetInterfacePeers ( ctx , in . Identifier )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch peers for ping checks" , "interface" , in . Identifier , "error" , err )
2023-08-04 13:34:18 +02:00
continue
}
for _ , peer := range peers {
2025-08-10 14:42:02 +02:00
c . pingJobs <- pingJob {
Peer : peer ,
Backend : in . Backend ,
}
2023-08-04 13:34:18 +02:00
}
}
}
}
}
func ( c * StatisticsCollector ) pingWorker ( ctx context . Context ) {
defer c . pingWaitGroup . Done ( )
2025-08-10 14:42:02 +02:00
for job := range c . pingJobs {
peer := job . Peer
backend := job . Backend
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
var connectionStateChanged bool
var newPeerStatus domain . PeerStatus
2025-08-10 14:42:02 +02:00
peerPingable := c . isPeerPingable ( ctx , backend , peer )
2025-03-02 08:51:13 +01:00
slog . Debug ( "peer ping check completed" , "peer" , peer . Identifier , "pingable" , peerPingable )
2025-01-05 11:30:34 +01:00
now := time . Now ( )
err := c . db . UpdatePeerStatus ( ctx , peer . Identifier ,
func ( p * domain . PeerStatus ) ( * domain . PeerStatus , error ) {
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
wasConnected := p . IsConnected
2025-01-05 11:30:34 +01:00
if peerPingable {
p . IsPingable = true
p . LastPing = & now
} else {
p . IsPingable = false
p . LastPing = nil
}
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
p . UpdatedAt = time . Now ( )
2026-03-19 23:11:40 +01:00
p . CalcConnected ( c . cfg . Backend . ReKeyTimeoutInterval )
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
if wasConnected != p . IsConnected {
connectionStateChanged = true
newPeerStatus = * p // store new status for event publishing
}
2025-01-05 11:30:34 +01:00
// Update prometheus metrics
go c . updatePeerMetrics ( ctx , * p )
return p , nil
} )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to update peer ping status" , "peer" , peer . Identifier , "error" , err )
2025-01-05 11:30:34 +01:00
} else {
2025-03-02 08:51:13 +01:00
slog . Debug ( "updated peer ping status" , "peer" , peer . Identifier )
2025-01-05 11:30:34 +01:00
}
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
if connectionStateChanged {
// publish event if connection state changed
2025-06-29 19:49:01 +02:00
c . bus . Publish ( app . TopicPeerStateChanged , newPeerStatus , peer )
add webhook event for peer state change (#444) (#468)
* add webhook event for peer state change (#444)
new event types: connect and disconnect
example payload:
```json
{
"event": "connect",
"entity": "peer",
"identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"payload": {
"PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
"IsConnected": true,
"IsPingable": false,
"LastPing": null,
"BytesReceived": 1860,
"BytesTransmitted": 10824,
"LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
"Endpoint": "10.55.66.77:33874",
"LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
}
}
```
* add webhook docs (#444)
2025-06-27 12:37:10 +02:00
}
2023-08-04 13:34:18 +02:00
}
}
2025-08-10 14:42:02 +02:00
func ( c * StatisticsCollector ) isPeerPingable (
ctx context . Context ,
backend domain . InterfaceBackend ,
peer domain . Peer ,
) bool {
2024-09-29 22:10:50 +02:00
if ! c . cfg . Statistics . UsePingChecks {
2023-08-04 13:34:18 +02:00
return false
}
checkAddr := peer . CheckAliveAddress ( )
if checkAddr == "" {
return false
}
2025-08-10 14:42:02 +02:00
stats , err := c . wg . GetControllerByName ( backend ) . PingAddresses ( ctx , checkAddr )
2023-08-04 13:34:18 +02:00
if err != nil {
2025-08-10 14:42:02 +02:00
slog . Debug ( "failed to ping peer" , "peer" , peer . Identifier , "error" , err )
2023-08-04 13:34:18 +02:00
return false
}
2025-08-10 14:42:02 +02:00
return stats . IsPingable ( )
2023-08-04 13:34:18 +02:00
}
2024-10-09 22:33:50 +02:00
func ( c * StatisticsCollector ) updateInterfaceMetrics ( status domain . InterfaceStatus ) {
c . ms . UpdateInterfaceMetrics ( status )
}
func ( c * StatisticsCollector ) updatePeerMetrics ( ctx context . Context , status domain . PeerStatus ) {
// Fetch peer data from the database
peer , err := c . db . GetPeer ( ctx , status . PeerId )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Warn ( "failed to fetch peer data for metrics" , "peer" , status . PeerId , "error" , err )
2024-10-09 22:33:50 +02:00
return
}
c . ms . UpdatePeerMetrics ( peer , status )
}
2025-01-05 11:30:34 +01:00
func ( c * StatisticsCollector ) connectToMessageBus ( ) {
_ = c . bus . Subscribe ( app . TopicPeerIdentifierUpdated , c . handlePeerIdentifierChangeEvent )
}
func ( c * StatisticsCollector ) handlePeerIdentifierChangeEvent ( oldIdentifier , newIdentifier domain . PeerIdentifier ) {
ctx := domain . SetUserInfo ( context . Background ( ) , domain . SystemAdminContextUserInfo ( ) )
// remove potential left-over status data
err := c . db . DeletePeerStatus ( ctx , oldIdentifier )
if err != nil {
2025-03-02 08:51:13 +01:00
slog . Error ( "failed to delete old peer status for migrated peer" , "oldIdentifier" , oldIdentifier ,
"newIdentifier" , newIdentifier , "error" , err )
2025-01-05 11:30:34 +01:00
}
}