From e10b4abec480a0549f1c648963563df1ec2034d3 Mon Sep 17 00:00:00 2001 From: Christoph Haas Date: Sat, 9 Aug 2025 15:21:17 +0200 Subject: [PATCH] speed up mikrotik interactions --- internal/adapters/wgcontroller/mikrotik.go | 129 +++++++++++++++------ internal/config/backend.go | 16 +++ 2 files changed, 110 insertions(+), 35 deletions(-) diff --git a/internal/adapters/wgcontroller/mikrotik.go b/internal/adapters/wgcontroller/mikrotik.go index 085281f..8498d34 100644 --- a/internal/adapters/wgcontroller/mikrotik.go +++ b/internal/adapters/wgcontroller/mikrotik.go @@ -72,13 +72,46 @@ func (c *MikrotikController) GetInterfaces(ctx context.Context) ([]domain.Physic return nil, fmt.Errorf("failed to query interfaces: %v", wgReply.Error) } + // Parallelize loading of interface details to speed up overall latency. + // Use a bounded semaphore to avoid overloading the MikroTik device. + maxConcurrent := c.cfg.GetConcurrency() + sem := make(chan struct{}, maxConcurrent) + interfaces := make([]domain.PhysicalInterface, 0, len(wgReply.Data)) - for _, wg := range wgReply.Data { - physicalInterface, err := c.loadInterfaceData(ctx, wg) - if err != nil { - return nil, err - } - interfaces = append(interfaces, *physicalInterface) + var mu sync.Mutex + var wgWait sync.WaitGroup + var firstErr error + ctx2, cancel := context.WithCancel(ctx) + defer cancel() + + for _, wgObj := range wgReply.Data { + wgWait.Add(1) + sem <- struct{}{} // block if more than maxConcurrent requests are processing + go func(wg lowlevel.GenericJsonObject) { + defer wgWait.Done() + defer func() { <-sem }() // read from the semaphore and make space for the next entry + if firstErr != nil { + return + } + pi, err := c.loadInterfaceData(ctx2, wg) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + cancel() + } + mu.Unlock() + return + } + mu.Lock() + interfaces = append(interfaces, *pi) + mu.Unlock() + }(wgObj) + } + + wgWait.Wait() + if firstErr != nil { + return nil, firstErr } return interfaces, nil @@ -139,37 +172,63 @@ func (c *MikrotikController) loadIpAddresses( ctx context.Context, deviceName string, ) (ipv4 []lowlevel.GenericJsonObject, ipv6 []lowlevel.GenericJsonObject, err error) { - addrV4Reply := c.client.Query(ctx, "/ip/address", &lowlevel.MikrotikRequestOptions{ - PropList: []string{ - ".id", "address", "network", - }, - Filters: map[string]string{ - "interface": deviceName, - "dynamic": "false", // we only want static addresses - "disabled": "false", // we only want addresses that are not disabled - }, - }) - if addrV4Reply.Status != lowlevel.MikrotikApiStatusOk { - return nil, nil, fmt.Errorf("failed to query IPv4 addresses for interface %s: %v", deviceName, - addrV4Reply.Error) + // Query IPv4 and IPv6 addresses in parallel to reduce latency. + var ( + v4 []lowlevel.GenericJsonObject + v6 []lowlevel.GenericJsonObject + v4Err error + v6Err error + wg sync.WaitGroup + ) + wg.Add(2) + + go func() { + defer wg.Done() + addrV4Reply := c.client.Query(ctx, "/ip/address", &lowlevel.MikrotikRequestOptions{ + PropList: []string{ + ".id", "address", "network", + }, + Filters: map[string]string{ + "interface": deviceName, + "dynamic": "false", // we only want static addresses + "disabled": "false", // we only want addresses that are not disabled + }, + }) + if addrV4Reply.Status != lowlevel.MikrotikApiStatusOk { + v4Err = fmt.Errorf("failed to query IPv4 addresses for interface %s: %v", deviceName, addrV4Reply.Error) + return + } + v4 = addrV4Reply.Data + }() + + go func() { + defer wg.Done() + addrV6Reply := c.client.Query(ctx, "/ipv6/address", &lowlevel.MikrotikRequestOptions{ + PropList: []string{ + ".id", "address", "network", + }, + Filters: map[string]string{ + "interface": deviceName, + "dynamic": "false", // we only want static addresses + "disabled": "false", // we only want addresses that are not disabled + }, + }) + if addrV6Reply.Status != lowlevel.MikrotikApiStatusOk { + v6Err = fmt.Errorf("failed to query IPv6 addresses for interface %s: %v", deviceName, addrV6Reply.Error) + return + } + v6 = addrV6Reply.Data + }() + + wg.Wait() + if v4Err != nil { + return nil, nil, v4Err + } + if v6Err != nil { + return nil, nil, v6Err } - addrV6Reply := c.client.Query(ctx, "/ipv6/address", &lowlevel.MikrotikRequestOptions{ - PropList: []string{ - ".id", "address", "network", - }, - Filters: map[string]string{ - "interface": deviceName, - "dynamic": "false", // we only want static addresses - "disabled": "false", // we only want addresses that are not disabled - }, - }) - if addrV6Reply.Status != lowlevel.MikrotikApiStatusOk { - return nil, nil, fmt.Errorf("failed to query IPv6 addresses for interface %s: %v", deviceName, - addrV6Reply.Error) - } - - return addrV4Reply.Data, addrV6Reply.Data, nil + return v4, v6, nil } func (c *MikrotikController) convertIpAddresses( diff --git a/internal/config/backend.go b/internal/config/backend.go index 5508e1c..3ec7ff0 100644 --- a/internal/config/backend.go +++ b/internal/config/backend.go @@ -53,5 +53,21 @@ type BackendMikrotik struct { ApiVerifyTls bool `yaml:"api_verify_tls"` // Whether to verify the TLS certificate of the Mikrotik API ApiTimeout time.Duration `yaml:"api_timeout"` // Timeout for API requests (default: 30 seconds) + // Concurrency controls the maximum number of concurrent API requests that this backend will issue + // when enumerating interfaces and their details. If 0 or negative, a default of 5 is used. + Concurrency int `yaml:"concurrency"` + Debug bool `yaml:"debug"` // Enable debug logging for the Mikrotik backend } + +// GetConcurrency returns the configured concurrency for this backend or a sane default (5) +// when the configured value is zero or negative. +func (b *BackendMikrotik) GetConcurrency() int { + if b == nil { + return 5 + } + if b.Concurrency <= 0 { + return 5 + } + return b.Concurrency +}