add webhook event for peer state change (#444)

new event types: connect and disconnect

example payload:

```json
{
  "event": "connect",
  "entity": "peer",
  "identifier": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
  "payload": {
    "PeerId": "Fb5TaziAs1WrPBjC/MFbWsIelVXvi0hDKZ3YQM9wmU8=",
    "IsConnected": true,
    "IsPingable": false,
    "LastPing": null,
    "BytesReceived": 1860,
    "BytesTransmitted": 10824,
    "LastHandshake": "2025-06-26T23:04:33.325216659+02:00",
    "Endpoint": "10.55.66.77:33874",
    "LastSessionStart": "2025-06-26T22:50:40.10221606+02:00"
  }
}
```
This commit is contained in:
Christoph Haas 2025-06-26 23:25:53 +02:00
parent 3a732fd3e5
commit 3c72a26e91
No known key found for this signature in database
8 changed files with 72 additions and 18 deletions

View File

@ -133,5 +133,5 @@ func (m *MetricsServer) UpdatePeerMetrics(peer *domain.Peer, status domain.PeerS
}
m.peerReceivedBytesTotal.WithLabelValues(labels...).Set(float64(status.BytesReceived))
m.peerSendBytesTotal.WithLabelValues(labels...).Set(float64(status.BytesTransmitted))
m.peerIsConnected.WithLabelValues(labels...).Set(internal.BoolToFloat64(status.IsConnected()))
m.peerIsConnected.WithLabelValues(labels...).Set(internal.BoolToFloat64(status.IsConnected))
}

View File

@ -198,7 +198,7 @@ func NewPeerStats(enabled bool, src []domain.PeerStatus) *PeerStats {
for _, srcStat := range src {
stats[string(srcStat.PeerId)] = PeerStatData{
IsConnected: srcStat.IsConnected(),
IsConnected: srcStat.IsConnected,
IsPingable: srcStat.IsPingable,
LastPing: srcStat.LastPing,
BytesReceived: srcStat.BytesReceived,

View File

@ -36,6 +36,7 @@ const TopicPeerDeleted = "peer:deleted"
const TopicPeerUpdated = "peer:updated"
const TopicPeerInterfaceUpdated = "peer:interface:updated"
const TopicPeerIdentifierUpdated = "peer:identifier:updated"
const TopicPeerStateChanged = "peer:state:changed"
// endregion peer-events

View File

@ -64,6 +64,7 @@ func (m Manager) connectToMessageBus() {
_ = m.bus.Subscribe(app.TopicPeerCreated, m.handlePeerCreateEvent)
_ = m.bus.Subscribe(app.TopicPeerUpdated, m.handlePeerUpdateEvent)
_ = m.bus.Subscribe(app.TopicPeerDeleted, m.handlePeerDeleteEvent)
_ = m.bus.Subscribe(app.TopicPeerStateChanged, m.handlePeerStateChangeEvent)
_ = m.bus.Subscribe(app.TopicInterfaceCreated, m.handleInterfaceCreateEvent)
_ = m.bus.Subscribe(app.TopicInterfaceUpdated, m.handleInterfaceUpdateEvent)
@ -135,6 +136,14 @@ func (m Manager) handleInterfaceDeleteEvent(iface domain.Interface) {
m.handleGenericEvent(WebhookEventDelete, iface)
}
func (m Manager) handlePeerStateChangeEvent(peerStatus domain.PeerStatus) {
if peerStatus.IsConnected {
m.handleGenericEvent(WebhookEventConnect, peerStatus)
} else {
m.handleGenericEvent(WebhookEventDisconnect, peerStatus)
}
}
func (m Manager) handleGenericEvent(action WebhookEvent, payload any) {
eventData, err := m.createWebhookData(action, payload)
if err != nil {
@ -177,6 +186,9 @@ func (m Manager) createWebhookData(action WebhookEvent, payload any) (*WebhookDa
case domain.Interface:
d.Entity = WebhookEntityInterface
d.Identifier = string(v.Identifier)
case domain.PeerStatus:
d.Entity = WebhookEntityPeer
d.Identifier = string(v.PeerId)
default:
return nil, fmt.Errorf("unsupported payload type: %T", v)
}

View File

@ -45,4 +45,6 @@ const (
WebhookEventCreate WebhookEvent = "create"
WebhookEventUpdate WebhookEvent = "update"
WebhookEventDelete WebhookEvent = "delete"
WebhookEventConnect WebhookEvent = "connect"
WebhookEventDisconnect WebhookEvent = "disconnect"
)

View File

@ -43,6 +43,8 @@ type StatisticsMetricsServer interface {
type StatisticsEventBus interface {
// Subscribe subscribes to a topic
Subscribe(topic string, fn interface{}) error
// Publish sends a message to the message bus.
Publish(topic string, args ...any)
}
type StatisticsCollector struct {
@ -55,6 +57,8 @@ type StatisticsCollector struct {
db StatisticsDatabaseRepo
wg StatisticsInterfaceController
ms StatisticsMetricsServer
peerChangeEvent chan domain.PeerIdentifier
}
// NewStatisticsCollector creates a new statistics collector.
@ -171,8 +175,12 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
continue
}
for _, peer := range peers {
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
err = c.db.UpdatePeerStatus(ctx, peer.Identifier,
func(p *domain.PeerStatus) (*domain.PeerStatus, error) {
wasConnected := p.IsConnected
var lastHandshake *time.Time
if !peer.LastHandshake.IsZero() {
lastHandshake = &peer.LastHandshake
@ -186,6 +194,12 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
p.BytesTransmitted = peer.BytesDownload // store bytes that where received from the peer and sent by the server
p.Endpoint = peer.Endpoint
p.LastHandshake = lastHandshake
p.CalcConnected()
if wasConnected != p.IsConnected {
connectionStateChanged = true
newPeerStatus = *p // store new status for event publishing
}
// Update prometheus metrics
go c.updatePeerMetrics(ctx, *p)
@ -197,6 +211,11 @@ func (c *StatisticsCollector) collectPeerData(ctx context.Context) {
} else {
slog.Debug("updated peer status", "peer", peer.Identifier)
}
if connectionStateChanged {
// publish event if connection state changed
c.bus.Publish(app.TopicPeerStateChanged, newPeerStatus)
}
}
}
}
@ -298,12 +317,17 @@ func (c *StatisticsCollector) enqueuePingChecks(ctx context.Context) {
func (c *StatisticsCollector) pingWorker(ctx context.Context) {
defer c.pingWaitGroup.Done()
for peer := range c.pingJobs {
var connectionStateChanged bool
var newPeerStatus domain.PeerStatus
peerPingable := c.isPeerPingable(ctx, peer)
slog.Debug("peer ping check completed", "peer", peer.Identifier, "pingable", peerPingable)
now := time.Now()
err := c.db.UpdatePeerStatus(ctx, peer.Identifier,
func(p *domain.PeerStatus) (*domain.PeerStatus, error) {
wasConnected := p.IsConnected
if peerPingable {
p.IsPingable = true
p.LastPing = &now
@ -311,6 +335,13 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
p.IsPingable = false
p.LastPing = nil
}
p.UpdatedAt = time.Now()
p.CalcConnected()
if wasConnected != p.IsConnected {
connectionStateChanged = true
newPeerStatus = *p // store new status for event publishing
}
// Update prometheus metrics
go c.updatePeerMetrics(ctx, *p)
@ -322,6 +353,11 @@ func (c *StatisticsCollector) pingWorker(ctx context.Context) {
} else {
slog.Debug("updated peer ping status", "peer", peer.Identifier)
}
if connectionStateChanged {
// publish event if connection state changed
c.bus.Publish(app.TopicPeerStateChanged, newPeerStatus)
}
}
}

View File

@ -3,21 +3,23 @@ package domain
import "time"
type PeerStatus struct {
PeerId PeerIdentifier `gorm:"primaryKey;column:identifier"`
UpdatedAt time.Time `gorm:"column:updated_at"`
PeerId PeerIdentifier `gorm:"primaryKey;column:identifier" json:"PeerId"`
UpdatedAt time.Time `gorm:"column:updated_at" json:"-"`
IsPingable bool `gorm:"column:pingable"`
LastPing *time.Time `gorm:"column:last_ping"`
IsConnected bool `gorm:"column:connected" json:"IsConnected"` // indicates if the peer is connected based on the last handshake or ping
BytesReceived uint64 `gorm:"column:received"`
BytesTransmitted uint64 `gorm:"column:transmitted"`
IsPingable bool `gorm:"column:pingable" json:"IsPingable"`
LastPing *time.Time `gorm:"column:last_ping" json:"LastPing"`
LastHandshake *time.Time `gorm:"column:last_handshake"`
Endpoint string `gorm:"column:endpoint"`
LastSessionStart *time.Time `gorm:"column:last_session_start"`
BytesReceived uint64 `gorm:"column:received" json:"BytesReceived"`
BytesTransmitted uint64 `gorm:"column:transmitted" json:"BytesTransmitted"`
LastHandshake *time.Time `gorm:"column:last_handshake" json:"LastHandshake"`
Endpoint string `gorm:"column:endpoint" json:"Endpoint"`
LastSessionStart *time.Time `gorm:"column:last_session_start" json:"LastSessionStart"`
}
func (s PeerStatus) IsConnected() bool {
func (s *PeerStatus) CalcConnected() {
oldestHandshakeTime := time.Now().Add(-2 * time.Minute) // if a handshake is older than 2 minutes, the peer is no longer connected
handshakeValid := false
@ -25,7 +27,7 @@ func (s PeerStatus) IsConnected() bool {
handshakeValid = !s.LastHandshake.Before(oldestHandshakeTime)
}
return s.IsPingable || handshakeValid
s.IsConnected = s.IsPingable || handshakeValid
}
type InterfaceStatus struct {

View File

@ -66,8 +66,9 @@ func TestPeerStatus_IsConnected(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.status.IsConnected(); got != tt.want {
t.Errorf("IsConnected() = %v, want %v", got, tt.want)
tt.status.CalcConnected()
if got := tt.status.IsConnected; got != tt.want {
t.Errorf("IsConnected = %v, want %v", got, tt.want)
}
})
}