From 0d2991d719cd6eb60bcb0ed76e9ff805ebf587bc Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Mon, 21 Aug 2017 17:10:13 -0400 Subject: [PATCH] Perf: improve performance of ListTasks This fetches ListTasks results using a few worker goroutines to improve performance with large numbers of tasks. --- kapacitor/kapa_client.go | 84 ++++++++++++++++++++----- kapacitor/kapa_client_benchmark_test.go | 49 +++++++++++++++ 2 files changed, 116 insertions(+), 17 deletions(-) create mode 100644 kapacitor/kapa_client_benchmark_test.go diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 0c576d3309..31f51bc96a 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -1,9 +1,13 @@ package kapacitor import ( + "sync" + client "github.com/influxdata/kapacitor/client/v1" ) +const ListTaskWorkers = 4 + // ensure PaginatingKapaClient is a KapaClient var _ KapaClient = &PaginatingKapaClient{} @@ -24,30 +28,76 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien allTasks := []client.Task{} - allOpts := &client.ListTasksOptions{ - // copy existing fields - TaskOptions: opts.TaskOptions, - Pattern: opts.Pattern, - Fields: opts.Fields, + optChan := make(chan client.ListTasksOptions) + taskChan := make(chan []client.Task, ListTaskWorkers) + done := make(chan struct{}) - // we take control of these two in the loop below - Limit: p.FetchRate, - Offset: 0, + var once sync.Once + + doneCloser := func() { + close(done) } - for { - resp, err := p.KapaClient.ListTasks(allOpts) - if err != nil { - return allTasks, err + go func() { + opts := &client.ListTasksOptions{ + // copy existing fields + TaskOptions: opts.TaskOptions, + Pattern: opts.Pattern, + Fields: opts.Fields, + + // we take control of these two in the loop below + Limit: p.FetchRate, + Offset: 0, } - // break if we've exhausted available tasks - if len(resp) == 0 { - break + for { + opts.Offset = p.FetchRate + opts.Offset + select { + case <-done: + close(optChan) + return + case optChan <- *opts: + // nop + } } + }() - allTasks = append(allTasks, resp...) - allOpts.Offset += len(resp) + var wg sync.WaitGroup + + wg.Add(ListTaskWorkers) + for i := 0; i < ListTaskWorkers; i++ { + go func() { + defer wg.Done() + for opt := range optChan { + resp, err := p.KapaClient.ListTasks(&opt) + if err != nil { + return + } + + // break and stop all workers if we're done + if len(resp) == 0 { + once.Do(doneCloser) + return + } + + // handoff tasks to consumer + taskChan <- resp + } + }() } + + var taskAsm sync.WaitGroup + taskAsm.Add(1) + go func() { + for task := range taskChan { + allTasks = append(allTasks, task...) + } + taskAsm.Done() + }() + + wg.Wait() + close(taskChan) + taskAsm.Wait() + return allTasks, nil } diff --git a/kapacitor/kapa_client_benchmark_test.go b/kapacitor/kapa_client_benchmark_test.go new file mode 100644 index 0000000000..83bb880280 --- /dev/null +++ b/kapacitor/kapa_client_benchmark_test.go @@ -0,0 +1,49 @@ +package kapacitor_test + +import ( + "testing" + + "github.com/influxdata/chronograf/kapacitor" + "github.com/influxdata/chronograf/mocks" + client "github.com/influxdata/kapacitor/client/v1" +) + +func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) } +func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) } +func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) } + +func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) { + // create a mock client that will return a huge response from ListTasks + mockClient := &mocks.KapaClient{ + ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { + // create all the tasks + allTasks := []client.Task{} + + // create N tasks from the benchmark runner + for i := 0; i < taskCount; i++ { + allTasks = append(allTasks, client.Task{}) + } + begin := opts.Offset + end := opts.Offset + opts.Limit + + if end > len(allTasks) { + end = len(allTasks) + } + + if begin > len(allTasks) { + begin = end + } + + return allTasks[begin:end], nil + }, + } + + pkap := kapacitor.PaginatingKapaClient{mockClient, 50} + + opts := &client.ListTasksOptions{} + + // let the benchmark runner run ListTasks until it's satisfied + for n := 0; n < b.N; n++ { + _, _ = pkap.ListTasks(opts) + } +}