diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 27acfa6b15..7d5db8a296 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -6,7 +6,17 @@ import ( client "github.com/influxdata/kapacitor/client/v1" ) -const ListTaskWorkers = 4 +const ( + // ListTaskWorkers describes the number of workers concurrently fetching + // tasks from Kapacitor. This constant was chosen after some benchmarking + // work and should likely work well for quad-core systems + ListTaskWorkers = 4 + + // TaskGatherers is the number of workers collating responses from + // ListTaskWorkers. There can only be one without additional synchronization + // around the output buffer from ListTasks + TaskGatherers = 1 +) // ensure PaginatingKapaClient is a KapaClient var _ KapaClient = &PaginatingKapaClient{} @@ -43,18 +53,18 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done) } - var taskAsm sync.WaitGroup - taskAsm.Add(1) + var gatherWg sync.WaitGroup + gatherWg.Add(TaskGatherers) go func() { for task := range taskChan { allTasks = append(allTasks, task...) } - taskAsm.Done() + gatherWg.Done() }() wg.Wait() close(taskChan) - taskAsm.Wait() + gatherWg.Wait() return allTasks, nil }