diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index f0d6c9e483..27acfa6b15 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -34,56 +34,13 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien var once sync.Once - doneCloser := func() { - close(done) - } - - go func() { - opts := &client.ListTasksOptions{ - // copy existing fields - TaskOptions: opts.TaskOptions, - Pattern: opts.Pattern, - Fields: opts.Fields, - - // we take control of these two in the loop below - Limit: p.FetchRate, - Offset: 0, - } - - for { - select { - case <-done: - close(optChan) - return - case optChan <- *opts: - // nop - } - opts.Offset = p.FetchRate + opts.Offset - } - }() + go p.generateKapacitorOptions(optChan, *opts, done) var wg sync.WaitGroup wg.Add(ListTaskWorkers) for i := 0; i < ListTaskWorkers; i++ { - go func() { - defer wg.Done() - for opt := range optChan { - resp, err := p.KapaClient.ListTasks(&opt) - if err != nil { - return - } - - // break and stop all workers if we're done - if len(resp) == 0 { - once.Do(doneCloser) - return - } - - // handoff tasks to consumer - taskChan <- resp - } - }() + go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done) } var taskAsm sync.WaitGroup @@ -101,3 +58,46 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien return allTasks, nil } + +// fetchFromKapacitor fetches a set of results from a kapacitor by reading a +// set of options from the provided optChan. Fetched tasks are pushed onto the +// provided taskChan +func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) { + defer wg.Done() + for opt := range optChan { + resp, err := p.KapaClient.ListTasks(&opt) + if err != nil { + return + } + + // break and stop all workers if we're done + if len(resp) == 0 { + closer.Do(func() { + close(done) + }) + return + } + + // handoff tasks to consumer + taskChan <- resp + } +} + +// generateKapacitorOptions creates ListTasksOptions with incrementally greater +// Limit and Offset parameters, and inserts them into the provided optChan +func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) { + // ensure Limit and Offset start from known quantities + opts.Limit = p.FetchRate + opts.Offset = 0 + + for { + select { + case <-done: + close(optChan) + return + case optChan <- opts: + // nop + } + opts.Offset = p.FetchRate + opts.Offset + } +}