chronograf/kapacitor/kapa_client.go

114 lines
3.0 KiB
Go
Raw Normal View History

package kapacitor
import (
"sync"
client "github.com/influxdata/kapacitor/client/v1"
)
const (
// ListTaskWorkers describes the number of workers concurrently fetching
// tasks from Kapacitor. This constant was chosen after some benchmarking
// work and should likely work well for quad-core systems
ListTaskWorkers = 4
// TaskGatherers is the number of workers collating responses from
// ListTaskWorkers. There can only be one without additional synchronization
// around the output buffer from ListTasks
TaskGatherers = 1
)
// ensure PaginatingKapaClient is a KapaClient
var _ KapaClient = &PaginatingKapaClient{}
// PaginatingKapaClient is a Kapacitor client that automatically navigates
// through Kapacitor's pagination to fetch all results
type PaginatingKapaClient struct {
KapaClient
FetchRate int // specifies the number of elements to fetch from Kapacitor at a time
}
// ListTasks lists all available tasks from Kapacitor, navigating pagination as
// it fetches them
func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) {
// only trigger auto-pagination with Offset=0 and Limit=0
if opts.Limit != 0 || opts.Offset != 0 {
return p.KapaClient.ListTasks(opts)
}
allTasks := []client.Task{}
optChan := make(chan client.ListTasksOptions)
taskChan := make(chan []client.Task, ListTaskWorkers)
done := make(chan struct{})
var once sync.Once
go p.generateKapacitorOptions(optChan, *opts, done)
var wg sync.WaitGroup
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done)
}
var gatherWg sync.WaitGroup
gatherWg.Add(TaskGatherers)
go func() {
for task := range taskChan {
allTasks = append(allTasks, task...)
}
gatherWg.Done()
}()
wg.Wait()
close(taskChan)
gatherWg.Wait()
return allTasks, nil
}
// fetchFromKapacitor fetches a set of results from a kapacitor by reading a
// set of options from the provided optChan. Fetched tasks are pushed onto the
// provided taskChan
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) {
defer wg.Done()
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
closer.Do(func() {
close(done)
})
return
}
// handoff tasks to consumer
taskChan <- resp
}
}
// generateKapacitorOptions creates ListTasksOptions with incrementally greater
// Limit and Offset parameters, and inserts them into the provided optChan
func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) {
// ensure Limit and Offset start from known quantities
opts.Limit = p.FetchRate
opts.Offset = 0
for {
select {
case <-done:
close(optChan)
return
case optChan <- opts:
// nop
}
opts.Offset = p.FetchRate + opts.Offset
}
}