speed up mikrotik interactions

This commit is contained in:
Christoph Haas 2025-08-09 15:21:17 +02:00
parent 08373fa675
commit e10b4abec4
No known key found for this signature in database
2 changed files with 110 additions and 35 deletions

View File

@ -72,13 +72,46 @@ func (c *MikrotikController) GetInterfaces(ctx context.Context) ([]domain.Physic
return nil, fmt.Errorf("failed to query interfaces: %v", wgReply.Error) return nil, fmt.Errorf("failed to query interfaces: %v", wgReply.Error)
} }
// Parallelize loading of interface details to speed up overall latency.
// Use a bounded semaphore to avoid overloading the MikroTik device.
maxConcurrent := c.cfg.GetConcurrency()
sem := make(chan struct{}, maxConcurrent)
interfaces := make([]domain.PhysicalInterface, 0, len(wgReply.Data)) interfaces := make([]domain.PhysicalInterface, 0, len(wgReply.Data))
for _, wg := range wgReply.Data { var mu sync.Mutex
physicalInterface, err := c.loadInterfaceData(ctx, wg) var wgWait sync.WaitGroup
if err != nil { var firstErr error
return nil, err ctx2, cancel := context.WithCancel(ctx)
} defer cancel()
interfaces = append(interfaces, *physicalInterface)
for _, wgObj := range wgReply.Data {
wgWait.Add(1)
sem <- struct{}{} // block if more than maxConcurrent requests are processing
go func(wg lowlevel.GenericJsonObject) {
defer wgWait.Done()
defer func() { <-sem }() // read from the semaphore and make space for the next entry
if firstErr != nil {
return
}
pi, err := c.loadInterfaceData(ctx2, wg)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
cancel()
}
mu.Unlock()
return
}
mu.Lock()
interfaces = append(interfaces, *pi)
mu.Unlock()
}(wgObj)
}
wgWait.Wait()
if firstErr != nil {
return nil, firstErr
} }
return interfaces, nil return interfaces, nil
@ -139,37 +172,63 @@ func (c *MikrotikController) loadIpAddresses(
ctx context.Context, ctx context.Context,
deviceName string, deviceName string,
) (ipv4 []lowlevel.GenericJsonObject, ipv6 []lowlevel.GenericJsonObject, err error) { ) (ipv4 []lowlevel.GenericJsonObject, ipv6 []lowlevel.GenericJsonObject, err error) {
addrV4Reply := c.client.Query(ctx, "/ip/address", &lowlevel.MikrotikRequestOptions{ // Query IPv4 and IPv6 addresses in parallel to reduce latency.
PropList: []string{ var (
".id", "address", "network", v4 []lowlevel.GenericJsonObject
}, v6 []lowlevel.GenericJsonObject
Filters: map[string]string{ v4Err error
"interface": deviceName, v6Err error
"dynamic": "false", // we only want static addresses wg sync.WaitGroup
"disabled": "false", // we only want addresses that are not disabled )
}, wg.Add(2)
})
if addrV4Reply.Status != lowlevel.MikrotikApiStatusOk { go func() {
return nil, nil, fmt.Errorf("failed to query IPv4 addresses for interface %s: %v", deviceName, defer wg.Done()
addrV4Reply.Error) addrV4Reply := c.client.Query(ctx, "/ip/address", &lowlevel.MikrotikRequestOptions{
PropList: []string{
".id", "address", "network",
},
Filters: map[string]string{
"interface": deviceName,
"dynamic": "false", // we only want static addresses
"disabled": "false", // we only want addresses that are not disabled
},
})
if addrV4Reply.Status != lowlevel.MikrotikApiStatusOk {
v4Err = fmt.Errorf("failed to query IPv4 addresses for interface %s: %v", deviceName, addrV4Reply.Error)
return
}
v4 = addrV4Reply.Data
}()
go func() {
defer wg.Done()
addrV6Reply := c.client.Query(ctx, "/ipv6/address", &lowlevel.MikrotikRequestOptions{
PropList: []string{
".id", "address", "network",
},
Filters: map[string]string{
"interface": deviceName,
"dynamic": "false", // we only want static addresses
"disabled": "false", // we only want addresses that are not disabled
},
})
if addrV6Reply.Status != lowlevel.MikrotikApiStatusOk {
v6Err = fmt.Errorf("failed to query IPv6 addresses for interface %s: %v", deviceName, addrV6Reply.Error)
return
}
v6 = addrV6Reply.Data
}()
wg.Wait()
if v4Err != nil {
return nil, nil, v4Err
}
if v6Err != nil {
return nil, nil, v6Err
} }
addrV6Reply := c.client.Query(ctx, "/ipv6/address", &lowlevel.MikrotikRequestOptions{ return v4, v6, nil
PropList: []string{
".id", "address", "network",
},
Filters: map[string]string{
"interface": deviceName,
"dynamic": "false", // we only want static addresses
"disabled": "false", // we only want addresses that are not disabled
},
})
if addrV6Reply.Status != lowlevel.MikrotikApiStatusOk {
return nil, nil, fmt.Errorf("failed to query IPv6 addresses for interface %s: %v", deviceName,
addrV6Reply.Error)
}
return addrV4Reply.Data, addrV6Reply.Data, nil
} }
func (c *MikrotikController) convertIpAddresses( func (c *MikrotikController) convertIpAddresses(

View File

@ -53,5 +53,21 @@ type BackendMikrotik struct {
ApiVerifyTls bool `yaml:"api_verify_tls"` // Whether to verify the TLS certificate of the Mikrotik API ApiVerifyTls bool `yaml:"api_verify_tls"` // Whether to verify the TLS certificate of the Mikrotik API
ApiTimeout time.Duration `yaml:"api_timeout"` // Timeout for API requests (default: 30 seconds) ApiTimeout time.Duration `yaml:"api_timeout"` // Timeout for API requests (default: 30 seconds)
// Concurrency controls the maximum number of concurrent API requests that this backend will issue
// when enumerating interfaces and their details. If 0 or negative, a default of 5 is used.
Concurrency int `yaml:"concurrency"`
Debug bool `yaml:"debug"` // Enable debug logging for the Mikrotik backend Debug bool `yaml:"debug"` // Enable debug logging for the Mikrotik backend
} }
// GetConcurrency returns the configured concurrency for this backend or a sane default (5)
// when the configured value is zero or negative.
func (b *BackendMikrotik) GetConcurrency() int {
if b == nil {
return 5
}
if b.Concurrency <= 0 {
return 5
}
return b.Concurrency
}