Extract option generator and task fetcher

The option generator goroutine and the task fetcher goroutine are more
readable when extracted into another method. Also, added some commenting
documenting what their expectations are
pull/10616/head
Tim Raymond 2017-08-23 16:21:13 -04:00
parent fe57d1e0c7
commit 8ae3dc35cc
1 changed files with 45 additions and 45 deletions

View File

@ -34,56 +34,13 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien
var once sync.Once
doneCloser := func() {
close(done)
}
go func() {
opts := &client.ListTasksOptions{
// copy existing fields
TaskOptions: opts.TaskOptions,
Pattern: opts.Pattern,
Fields: opts.Fields,
// we take control of these two in the loop below
Limit: p.FetchRate,
Offset: 0,
}
for {
select {
case <-done:
close(optChan)
return
case optChan <- *opts:
// nop
}
opts.Offset = p.FetchRate + opts.Offset
}
}()
go p.generateKapacitorOptions(optChan, *opts, done)
var wg sync.WaitGroup
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go func() {
defer wg.Done()
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
once.Do(doneCloser)
return
}
// handoff tasks to consumer
taskChan <- resp
}
}()
go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done)
}
var taskAsm sync.WaitGroup
@ -101,3 +58,46 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien
return allTasks, nil
}
// fetchFromKapacitor fetches a set of results from a kapacitor by reading a
// set of options from the provided optChan. Fetched tasks are pushed onto the
// provided taskChan
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) {
defer wg.Done()
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
closer.Do(func() {
close(done)
})
return
}
// handoff tasks to consumer
taskChan <- resp
}
}
// generateKapacitorOptions creates ListTasksOptions with incrementally greater
// Limit and Offset parameters, and inserts them into the provided optChan
func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) {
// ensure Limit and Offset start from known quantities
opts.Limit = p.FetchRate
opts.Offset = 0
for {
select {
case <-done:
close(optChan)
return
case optChan <- opts:
// nop
}
opts.Offset = p.FetchRate + opts.Offset
}
}