Perf: improve performance of ListTasks

This fetches ListTasks results using a few worker goroutines to improve
performance with large numbers of tasks.
pull/10616/head
Tim Raymond 2017-08-21 17:10:13 -04:00
parent f7291550c0
commit 0d2991d719
2 changed files with 116 additions and 17 deletions

View File

@ -1,9 +1,13 @@
package kapacitor
import (
"sync"
client "github.com/influxdata/kapacitor/client/v1"
)
const ListTaskWorkers = 4
// ensure PaginatingKapaClient is a KapaClient
var _ KapaClient = &PaginatingKapaClient{}
@ -24,30 +28,76 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien
allTasks := []client.Task{}
allOpts := &client.ListTasksOptions{
// copy existing fields
TaskOptions: opts.TaskOptions,
Pattern: opts.Pattern,
Fields: opts.Fields,
optChan := make(chan client.ListTasksOptions)
taskChan := make(chan []client.Task, ListTaskWorkers)
done := make(chan struct{})
// we take control of these two in the loop below
Limit: p.FetchRate,
Offset: 0,
var once sync.Once
doneCloser := func() {
close(done)
}
for {
resp, err := p.KapaClient.ListTasks(allOpts)
if err != nil {
return allTasks, err
go func() {
opts := &client.ListTasksOptions{
// copy existing fields
TaskOptions: opts.TaskOptions,
Pattern: opts.Pattern,
Fields: opts.Fields,
// we take control of these two in the loop below
Limit: p.FetchRate,
Offset: 0,
}
// break if we've exhausted available tasks
if len(resp) == 0 {
break
for {
opts.Offset = p.FetchRate + opts.Offset
select {
case <-done:
close(optChan)
return
case optChan <- *opts:
// nop
}
}
}()
allTasks = append(allTasks, resp...)
allOpts.Offset += len(resp)
var wg sync.WaitGroup
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go func() {
defer wg.Done()
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
once.Do(doneCloser)
return
}
// handoff tasks to consumer
taskChan <- resp
}
}()
}
var taskAsm sync.WaitGroup
taskAsm.Add(1)
go func() {
for task := range taskChan {
allTasks = append(allTasks, task...)
}
taskAsm.Done()
}()
wg.Wait()
close(taskChan)
taskAsm.Wait()
return allTasks, nil
}

View File

@ -0,0 +1,49 @@
package kapacitor_test
import (
"testing"
"github.com/influxdata/chronograf/kapacitor"
"github.com/influxdata/chronograf/mocks"
client "github.com/influxdata/kapacitor/client/v1"
)
func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) }
func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) }
func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) }
func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) {
// create a mock client that will return a huge response from ListTasks
mockClient := &mocks.KapaClient{
ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) {
// create all the tasks
allTasks := []client.Task{}
// create N tasks from the benchmark runner
for i := 0; i < taskCount; i++ {
allTasks = append(allTasks, client.Task{})
}
begin := opts.Offset
end := opts.Offset + opts.Limit
if end > len(allTasks) {
end = len(allTasks)
}
if begin > len(allTasks) {
begin = end
}
return allTasks[begin:end], nil
},
}
pkap := kapacitor.PaginatingKapaClient{mockClient, 50}
opts := &client.ListTasksOptions{}
// let the benchmark runner run ListTasks until it's satisfied
for n := 0; n < b.N; n++ {
_, _ = pkap.ListTasks(opts)
}
}