From d0be50ab37855572723daafac9fd1f2a4e48b318 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Tue, 15 Aug 2017 16:53:21 -0400 Subject: [PATCH 01/10] Add PaginatingKapacitorClient The kapacitor client used in the kapacitor endpoints is limited to fetching whatever limit you provide it. If you provide no limit, it defaults to a limit of 100. We use this default behavior currently. Some users have more than 100 tasks, so we need a client that's capable of continually fetching tasks from Kapacitor until there are none left, and returning the full response to the frontend. This introduces a PaginatingKapacitorClient which does exactly that. Also, test coverage was added around the KapacitorRulesGet endpoint, since it was previously untested. --- kapacitor/kapa_client.go | 51 ++++++++++++ kapacitor/kapa_client_test.go | 54 ++++++++++++ mocks/kapacitor_client.go | 37 +++++++++ server/kapacitors_test.go | 149 +++++++++++++++++++++++++++++++++- 4 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 kapacitor/kapa_client.go create mode 100644 kapacitor/kapa_client_test.go create mode 100644 mocks/kapacitor_client.go diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go new file mode 100644 index 000000000..773de7919 --- /dev/null +++ b/kapacitor/kapa_client.go @@ -0,0 +1,51 @@ +package kapacitor + +import client "github.com/influxdata/kapacitor/client/v1" + +// ensure PaginatingKapaClient is a KapaClient +var _ KapaClient = &PaginatingKapaClient{} + +// PaginatingKapaClient is a Kapacitor client that automatically navigates +// through Kapacitor's pagination to fetch all results +type PaginatingKapaClient struct { + KapaClient + FetchRate int // specifies the number of elements to fetch from Kapacitor at a time +} + +// ListTasks lists all available tasks from Kapacitor, navigating pagination as +// it fetches them +func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) { + // only trigger auto-pagination with Offset=0 and Limit=0 + if opts.Limit != 0 && opts.Offset != 0 { + p.KapaClient.ListTasks(opts) + } + + allTasks := []client.Task{} + + allOpts := &client.ListTasksOptions{ + // copy existing fields + TaskOptions: opts.TaskOptions, + Pattern: opts.Pattern, + Fields: opts.Fields, + + // we take control of these two in the loop below + Limit: p.FetchRate, + Offset: 0, + } + + for { + resp, err := p.KapaClient.ListTasks(allOpts) + if err != nil { + return allTasks, err + } + + // break if we've exhausted available tasks + if len(resp) == 0 { + break + } + + allTasks = append(allTasks, resp...) + allOpts.Offset += len(resp) + } + return allTasks, nil +} diff --git a/kapacitor/kapa_client_test.go b/kapacitor/kapa_client_test.go new file mode 100644 index 000000000..a615b3c4f --- /dev/null +++ b/kapacitor/kapa_client_test.go @@ -0,0 +1,54 @@ +package kapacitor_test + +import ( + "testing" + + "github.com/influxdata/chronograf/kapacitor" + "github.com/influxdata/chronograf/mocks" + client "github.com/influxdata/kapacitor/client/v1" +) + +func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { + const lenAllTasks = 200 + + // create a mock client that will return a huge response from ListTasks + mockClient := &mocks.KapaClient{ + ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { + // create all the tasks + allTasks := []client.Task{} + for i := 0; i < lenAllTasks; i++ { + allTasks = append(allTasks, client.Task{}) + } + begin := opts.Offset + end := opts.Offset + opts.Limit + + if end > len(allTasks) { + end = len(allTasks) + } + + return allTasks[begin:end], nil + }, + } + + pkap := kapacitor.PaginatingKapaClient{mockClient, 100} + + opts := &client.ListTasksOptions{ + Limit: 100, + Offset: 0, + } + + // ensure 100 elems returned when calling mockClient directly + tasks, _ := mockClient.ListTasks(opts) + + if len(tasks) != 100 { + t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks)) + } + + // ensure PaginatingKapaClient returns _all_ tasks with 0 value for Limit and Offset + allOpts := &client.ListTasksOptions{} + allTasks, _ := pkap.ListTasks(allOpts) + + if len(allTasks) != lenAllTasks { + t.Error("PaginatingKapaClient: Expected to find", lenAllTasks, "tasks but found", len(allTasks)) + } +} diff --git a/mocks/kapacitor_client.go b/mocks/kapacitor_client.go new file mode 100644 index 000000000..2d5e218bf --- /dev/null +++ b/mocks/kapacitor_client.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "github.com/influxdata/chronograf/kapacitor" + client "github.com/influxdata/kapacitor/client/v1" +) + +var _ kapacitor.KapaClient = &KapaClient{} + +// Client is a mock Kapacitor client +type KapaClient struct { + CreateTaskF func(opts client.CreateTaskOptions) (client.Task, error) + DeleteTaskF func(link client.Link) error + ListTasksF func(opts *client.ListTasksOptions) ([]client.Task, error) + TaskF func(link client.Link, opts *client.TaskOptions) (client.Task, error) + UpdateTaskF func(link client.Link, opts client.UpdateTaskOptions) (client.Task, error) +} + +func (p *KapaClient) CreateTask(opts client.CreateTaskOptions) (client.Task, error) { + return p.CreateTaskF(opts) +} + +func (p *KapaClient) DeleteTask(link client.Link) error { + return p.DeleteTaskF(link) +} + +func (p *KapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) { + return p.ListTasksF(opts) +} + +func (p *KapaClient) Task(link client.Link, opts *client.TaskOptions) (client.Task, error) { + return p.TaskF(link, opts) +} + +func (p *KapaClient) UpdateTask(link client.Link, opts client.UpdateTaskOptions) (client.Task, error) { + return p.UpdateTaskF(link, opts) +} diff --git a/server/kapacitors_test.go b/server/kapacitors_test.go index 116aca560..7869ed81e 100644 --- a/server/kapacitors_test.go +++ b/server/kapacitors_test.go @@ -1,11 +1,29 @@ -package server +package server_test import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" "testing" + "github.com/bouk/httprouter" + "github.com/google/go-cmp/cmp" "github.com/influxdata/chronograf" + "github.com/influxdata/chronograf/mocks" + "github.com/influxdata/chronograf/server" ) +const tickScript = ` +stream + |from() + .measurement('cpu') + |alert() + .crit(lambda: "usage_idle" < 10) + .log('/tmp/alert') +` + func TestValidRuleRequest(t *testing.T) { tests := []struct { name string @@ -48,9 +66,136 @@ func TestValidRuleRequest(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr { + if err := server.ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr { t.Errorf("ValidRuleRequest() error = %v, wantErr %v", err, tt.wantErr) } }) } } + +func Test_KapacitorRulesGet(t *testing.T) { + kapaTests := []struct { + name string + requestPath string + mockAlerts []chronograf.AlertRule + expected []chronograf.AlertRule + }{ + { + "basic", + "/chronograf/v1/sources/1/kapacitors/1/rules", + []chronograf.AlertRule{ + { + ID: "cpu_alert", + Name: "cpu_alert", + }, + }, + []chronograf.AlertRule{ + { + ID: "cpu_alert", + Name: "cpu_alert", + Alerts: []string{}, + }, + }, + }, + } + + for _, test := range kapaTests { + test := test // needed to avoid data race + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // setup mock kapa API + kapaSrv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + tsks := []map[string]interface{}{} + for _, task := range test.mockAlerts { + tsks = append(tsks, map[string]interface{}{ + "id": task.ID, + "script": tickScript, + "status": "enabled", + "link": map[string]interface{}{ + "rel": "self", + "href": "/kapacitor/v1/tasks/cpu_alert", + }, + }) + } + + tasks := map[string]interface{}{ + "tasks": tsks, + } + + err := json.NewEncoder(rw).Encode(&tasks) + if err != nil { + t.Error("Failed to encode JSON. err:", err) + } + })) + defer kapaSrv.Close() + + // setup mock service and test logger + testLogger := mocks.TestLogger{} + svc := &server.Service{ + ServersStore: &mocks.ServersStore{ + GetF: func(ctx context.Context, ID int) (chronograf.Server, error) { + return chronograf.Server{ + SrcID: ID, + URL: kapaSrv.URL, + }, nil + }, + }, + Logger: &testLogger, + } + + // setup request and response recorder + req := httptest.NewRequest("GET", test.requestPath, strings.NewReader("")) + rr := httptest.NewRecorder() + + // setup context and request params + bg := context.Background() + params := httprouter.Params{ + { + "id", + "1", + }, + { + "kid", + "1", + }, + } + ctx := httprouter.WithParams(bg, params) + req = req.WithContext(ctx) + + // invoke KapacitorRulesGet endpoint + svc.KapacitorRulesGet(rr, req) + + // destructure response + frame := struct { + Rules []struct { + chronograf.AlertRule + TICKScript json.RawMessage `json:"tickscript"` + Status json.RawMessage `json:"status"` + Links json.RawMessage `json:"links"` + } `json:"rules"` + }{} + + resp := rr.Result() + + err := json.NewDecoder(resp.Body).Decode(&frame) + if err != nil { + t.Fatal("Err decoding kapa rule response: err:", err) + } + + actual := make([]chronograf.AlertRule, len(frame.Rules)) + + for idx, _ := range frame.Rules { + actual[idx] = frame.Rules[idx].AlertRule + } + + if resp.StatusCode != http.StatusOK { + t.Fatal("Expected HTTP 200 OK but got", resp.Status) + } + + if !cmp.Equal(test.expected, actual) { + t.Fatalf("%q - Alert rules differ! diff:\n%s\n", test.name, cmp.Diff(test.expected, actual)) + } + }) + } +} From 886046ed9a508315a23d92f04bbfc1881e53ba3a Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Tue, 15 Aug 2017 17:30:29 -0400 Subject: [PATCH 02/10] Switch KapacitorRulesGet to use Pagination Kapacitor responses are paginated, and sometimes users have more than the default 100 tasks that are returned from Kapacitor. This replaces the previous Kapa client with one that automatically follows paginated responses from Kapacitor's ListTasks endpoint and returns the full response. Tests for the KapacitorRulesGet endpoint had to be updated because they did not account for "limit" and "offset", and so led to an infinite loop with the paginated client. A correct kapacitor backend will not have this behavior --- kapacitor/client.go | 12 +++++++++++- kapacitor/kapa_client.go | 4 +++- kapacitor/kapa_client_test.go | 4 ++-- server/kapacitors_test.go | 31 ++++++++++++++++++++++++++++--- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/kapacitor/client.go b/kapacitor/client.go index 3f516d8ee..b868a5c57 100644 --- a/kapacitor/client.go +++ b/kapacitor/client.go @@ -12,6 +12,10 @@ import ( const ( // Prefix is prepended to the ID of all alerts Prefix = "chronograf-v1-" + + // DefaultFetchRate is the rate at which paginated responses will be consumed + // from a Kapacitor + FetchRate = 100 ) // Client communicates to kapacitor @@ -325,8 +329,14 @@ func NewKapaClient(url, username, password string) (KapaClient, error) { } } - return client.New(client.Config{ + clnt, err := client.New(client.Config{ URL: url, Credentials: creds, }) + + if err != nil { + return clnt, err + } + + return &PaginatingKapaClient{clnt, FetchRate}, nil } diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 773de7919..603bb486b 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -1,6 +1,8 @@ package kapacitor -import client "github.com/influxdata/kapacitor/client/v1" +import ( + client "github.com/influxdata/kapacitor/client/v1" +) // ensure PaginatingKapaClient is a KapaClient var _ KapaClient = &PaginatingKapaClient{} diff --git a/kapacitor/kapa_client_test.go b/kapacitor/kapa_client_test.go index a615b3c4f..a1d790fcd 100644 --- a/kapacitor/kapa_client_test.go +++ b/kapacitor/kapa_client_test.go @@ -9,7 +9,7 @@ import ( ) func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { - const lenAllTasks = 200 + const lenAllTasks = 227 // prime, to stress odd result sets // create a mock client that will return a huge response from ListTasks mockClient := &mocks.KapaClient{ @@ -30,7 +30,7 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { }, } - pkap := kapacitor.PaginatingKapaClient{mockClient, 100} + pkap := kapacitor.PaginatingKapaClient{mockClient, 50} opts := &client.ListTasksOptions{ Limit: 100, diff --git a/server/kapacitors_test.go b/server/kapacitors_test.go index 7869ed81e..6d4ba76e7 100644 --- a/server/kapacitors_test.go +++ b/server/kapacitors_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "strconv" "strings" "testing" @@ -106,6 +107,18 @@ func Test_KapacitorRulesGet(t *testing.T) { // setup mock kapa API kapaSrv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + limit, err := strconv.Atoi(params.Get("limit")) + if err != nil { + rw.WriteHeader(http.StatusBadRequest) + return + } + offset, err := strconv.Atoi(params.Get("offset")) + if err != nil { + rw.WriteHeader(http.StatusBadRequest) + return + } + tsks := []map[string]interface{}{} for _, task := range test.mockAlerts { tsks = append(tsks, map[string]interface{}{ @@ -119,11 +132,23 @@ func Test_KapacitorRulesGet(t *testing.T) { }) } - tasks := map[string]interface{}{ - "tasks": tsks, + var tasks map[string]interface{} + + if offset >= len(tsks) { + tasks = map[string]interface{}{ + "tasks": []map[string]interface{}{}, + } + } else if limit+offset > len(tsks) { + tasks = map[string]interface{}{ + "tasks": tsks[offset:], + } + } else { + tasks = map[string]interface{}{ + "tasks": tsks[offset : offset+limit], + } } - err := json.NewEncoder(rw).Encode(&tasks) + err = json.NewEncoder(rw).Encode(&tasks) if err != nil { t.Error("Failed to encode JSON. err:", err) } From 01596453bdc0b14b3f491a12131f6ba8a6b03656 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Fri, 18 Aug 2017 16:44:04 -0400 Subject: [PATCH 03/10] Update comment Variable name changed. Improved clarity. --- kapacitor/client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kapacitor/client.go b/kapacitor/client.go index b868a5c57..b7687f3d9 100644 --- a/kapacitor/client.go +++ b/kapacitor/client.go @@ -13,8 +13,7 @@ const ( // Prefix is prepended to the ID of all alerts Prefix = "chronograf-v1-" - // DefaultFetchRate is the rate at which paginated responses will be consumed - // from a Kapacitor + // FetchRate is the rate Paginating Kapacitor Clients will consume responses FetchRate = 100 ) From 81f9e410f90516e54e461c793304ffdb6c2f4588 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Fri, 18 Aug 2017 17:07:00 -0400 Subject: [PATCH 04/10] Fix issue where offset & limit won't work Because we were testing the mock client, and not the paginating kapacitor client for the case where limit and offset were provided, an issue with that code path was not exposed. The issues exposed were that the condition was incorrect for triggering this behavior, and no return clause was present to prevent the remainder of the ListTasks method from running. --- kapacitor/kapa_client.go | 4 ++-- kapacitor/kapa_client_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 603bb486b..0c576d330 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -18,8 +18,8 @@ type PaginatingKapaClient struct { // it fetches them func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) { // only trigger auto-pagination with Offset=0 and Limit=0 - if opts.Limit != 0 && opts.Offset != 0 { - p.KapaClient.ListTasks(opts) + if opts.Limit != 0 || opts.Offset != 0 { + return p.KapaClient.ListTasks(opts) } allTasks := []client.Task{} diff --git a/kapacitor/kapa_client_test.go b/kapacitor/kapa_client_test.go index a1d790fcd..63e1546ec 100644 --- a/kapacitor/kapa_client_test.go +++ b/kapacitor/kapa_client_test.go @@ -38,7 +38,7 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { } // ensure 100 elems returned when calling mockClient directly - tasks, _ := mockClient.ListTasks(opts) + tasks, _ := pkap.ListTasks(opts) if len(tasks) != 100 { t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks)) From c3be40513d790f0eb9e54b050411632e04108140 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Mon, 21 Aug 2017 17:10:13 -0400 Subject: [PATCH 05/10] Perf: improve performance of ListTasks This fetches ListTasks results using a few worker goroutines to improve performance with large numbers of tasks. --- kapacitor/kapa_client.go | 84 ++++++++++++++++++++----- kapacitor/kapa_client_benchmark_test.go | 49 +++++++++++++++ 2 files changed, 116 insertions(+), 17 deletions(-) create mode 100644 kapacitor/kapa_client_benchmark_test.go diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 0c576d330..31f51bc96 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -1,9 +1,13 @@ package kapacitor import ( + "sync" + client "github.com/influxdata/kapacitor/client/v1" ) +const ListTaskWorkers = 4 + // ensure PaginatingKapaClient is a KapaClient var _ KapaClient = &PaginatingKapaClient{} @@ -24,30 +28,76 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien allTasks := []client.Task{} - allOpts := &client.ListTasksOptions{ - // copy existing fields - TaskOptions: opts.TaskOptions, - Pattern: opts.Pattern, - Fields: opts.Fields, + optChan := make(chan client.ListTasksOptions) + taskChan := make(chan []client.Task, ListTaskWorkers) + done := make(chan struct{}) - // we take control of these two in the loop below - Limit: p.FetchRate, - Offset: 0, + var once sync.Once + + doneCloser := func() { + close(done) } - for { - resp, err := p.KapaClient.ListTasks(allOpts) - if err != nil { - return allTasks, err + go func() { + opts := &client.ListTasksOptions{ + // copy existing fields + TaskOptions: opts.TaskOptions, + Pattern: opts.Pattern, + Fields: opts.Fields, + + // we take control of these two in the loop below + Limit: p.FetchRate, + Offset: 0, } - // break if we've exhausted available tasks - if len(resp) == 0 { - break + for { + opts.Offset = p.FetchRate + opts.Offset + select { + case <-done: + close(optChan) + return + case optChan <- *opts: + // nop + } } + }() - allTasks = append(allTasks, resp...) - allOpts.Offset += len(resp) + var wg sync.WaitGroup + + wg.Add(ListTaskWorkers) + for i := 0; i < ListTaskWorkers; i++ { + go func() { + defer wg.Done() + for opt := range optChan { + resp, err := p.KapaClient.ListTasks(&opt) + if err != nil { + return + } + + // break and stop all workers if we're done + if len(resp) == 0 { + once.Do(doneCloser) + return + } + + // handoff tasks to consumer + taskChan <- resp + } + }() } + + var taskAsm sync.WaitGroup + taskAsm.Add(1) + go func() { + for task := range taskChan { + allTasks = append(allTasks, task...) + } + taskAsm.Done() + }() + + wg.Wait() + close(taskChan) + taskAsm.Wait() + return allTasks, nil } diff --git a/kapacitor/kapa_client_benchmark_test.go b/kapacitor/kapa_client_benchmark_test.go new file mode 100644 index 000000000..83bb88028 --- /dev/null +++ b/kapacitor/kapa_client_benchmark_test.go @@ -0,0 +1,49 @@ +package kapacitor_test + +import ( + "testing" + + "github.com/influxdata/chronograf/kapacitor" + "github.com/influxdata/chronograf/mocks" + client "github.com/influxdata/kapacitor/client/v1" +) + +func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) } +func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) } +func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) } + +func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) { + // create a mock client that will return a huge response from ListTasks + mockClient := &mocks.KapaClient{ + ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { + // create all the tasks + allTasks := []client.Task{} + + // create N tasks from the benchmark runner + for i := 0; i < taskCount; i++ { + allTasks = append(allTasks, client.Task{}) + } + begin := opts.Offset + end := opts.Offset + opts.Limit + + if end > len(allTasks) { + end = len(allTasks) + } + + if begin > len(allTasks) { + begin = end + } + + return allTasks[begin:end], nil + }, + } + + pkap := kapacitor.PaginatingKapaClient{mockClient, 50} + + opts := &client.ListTasksOptions{} + + // let the benchmark runner run ListTasks until it's satisfied + for n := 0; n < b.N; n++ { + _, _ = pkap.ListTasks(opts) + } +} From d560ab020a802fd898f83d3bdb1466703848fd3e Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Tue, 22 Aug 2017 10:23:24 -0400 Subject: [PATCH 06/10] Improve benchmark, fix offset bug This ensures that setup time does not impact the results of the benchmark since profiling showed that much time was spent setting up the test array of tasks. This also uses `make` to build that slice instead. Also, the tests revealed that there was a bug where offsets were pre-incremented rather than post-incremented, omitting the first 100 results. --- kapacitor/kapa_client.go | 2 +- kapacitor/kapa_client_benchmark_test.go | 23 ++++++++++++++--------- kapacitor/kapa_client_test.go | 4 ++++ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 31f51bc96..f0d6c9e48 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -51,7 +51,6 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien } for { - opts.Offset = p.FetchRate + opts.Offset select { case <-done: close(optChan) @@ -59,6 +58,7 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien case optChan <- *opts: // nop } + opts.Offset = p.FetchRate + opts.Offset } }() diff --git a/kapacitor/kapa_client_benchmark_test.go b/kapacitor/kapa_client_benchmark_test.go index 83bb88028..bca4cbebb 100644 --- a/kapacitor/kapa_client_benchmark_test.go +++ b/kapacitor/kapa_client_benchmark_test.go @@ -8,21 +8,23 @@ import ( client "github.com/influxdata/kapacitor/client/v1" ) -func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) } -func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) } -func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) } +func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) } +func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) } +func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) } +func BenchmarkKapaClient100000(b *testing.B) { benchmark_PaginatingKapaClient(100000, b) } + +var tasks []client.Task func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) { + + b.StopTimer() // eliminate setup time + // create a mock client that will return a huge response from ListTasks mockClient := &mocks.KapaClient{ ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { // create all the tasks - allTasks := []client.Task{} + allTasks := make([]client.Task, taskCount) - // create N tasks from the benchmark runner - for i := 0; i < taskCount; i++ { - allTasks = append(allTasks, client.Task{}) - } begin := opts.Offset end := opts.Offset + opts.Limit @@ -42,8 +44,11 @@ func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) { opts := &client.ListTasksOptions{} + b.StartTimer() // eliminate setup time + // let the benchmark runner run ListTasks until it's satisfied for n := 0; n < b.N; n++ { - _, _ = pkap.ListTasks(opts) + // assignment is to avoid having the call optimized away + tasks, _ = pkap.ListTasks(opts) } } diff --git a/kapacitor/kapa_client_test.go b/kapacitor/kapa_client_test.go index 63e1546ec..9dda521e5 100644 --- a/kapacitor/kapa_client_test.go +++ b/kapacitor/kapa_client_test.go @@ -26,6 +26,10 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { end = len(allTasks) } + if begin > len(allTasks) { + begin = end + } + return allTasks[begin:end], nil }, } From a3db471abeab3db4f5ade40f4f895108914050a9 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Wed, 23 Aug 2017 16:21:13 -0400 Subject: [PATCH 07/10] Extract option generator and task fetcher The option generator goroutine and the task fetcher goroutine are more readable when extracted into another method. Also, added some commenting documenting what their expectations are --- kapacitor/kapa_client.go | 90 ++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index f0d6c9e48..27acfa6b1 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -34,56 +34,13 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien var once sync.Once - doneCloser := func() { - close(done) - } - - go func() { - opts := &client.ListTasksOptions{ - // copy existing fields - TaskOptions: opts.TaskOptions, - Pattern: opts.Pattern, - Fields: opts.Fields, - - // we take control of these two in the loop below - Limit: p.FetchRate, - Offset: 0, - } - - for { - select { - case <-done: - close(optChan) - return - case optChan <- *opts: - // nop - } - opts.Offset = p.FetchRate + opts.Offset - } - }() + go p.generateKapacitorOptions(optChan, *opts, done) var wg sync.WaitGroup wg.Add(ListTaskWorkers) for i := 0; i < ListTaskWorkers; i++ { - go func() { - defer wg.Done() - for opt := range optChan { - resp, err := p.KapaClient.ListTasks(&opt) - if err != nil { - return - } - - // break and stop all workers if we're done - if len(resp) == 0 { - once.Do(doneCloser) - return - } - - // handoff tasks to consumer - taskChan <- resp - } - }() + go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done) } var taskAsm sync.WaitGroup @@ -101,3 +58,46 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien return allTasks, nil } + +// fetchFromKapacitor fetches a set of results from a kapacitor by reading a +// set of options from the provided optChan. Fetched tasks are pushed onto the +// provided taskChan +func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) { + defer wg.Done() + for opt := range optChan { + resp, err := p.KapaClient.ListTasks(&opt) + if err != nil { + return + } + + // break and stop all workers if we're done + if len(resp) == 0 { + closer.Do(func() { + close(done) + }) + return + } + + // handoff tasks to consumer + taskChan <- resp + } +} + +// generateKapacitorOptions creates ListTasksOptions with incrementally greater +// Limit and Offset parameters, and inserts them into the provided optChan +func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) { + // ensure Limit and Offset start from known quantities + opts.Limit = p.FetchRate + opts.Offset = 0 + + for { + select { + case <-done: + close(optChan) + return + case optChan <- opts: + // nop + } + opts.Offset = p.FetchRate + opts.Offset + } +} From 31abf6dfd898077f3741d1bbb67d66dd22711386 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Wed, 23 Aug 2017 16:32:00 -0400 Subject: [PATCH 08/10] Explain magic number, var renaming This extracts a constant and comments it for clarity. Also renames a confusing waitgroup --- kapacitor/kapa_client.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go index 27acfa6b1..7d5db8a29 100644 --- a/kapacitor/kapa_client.go +++ b/kapacitor/kapa_client.go @@ -6,7 +6,17 @@ import ( client "github.com/influxdata/kapacitor/client/v1" ) -const ListTaskWorkers = 4 +const ( + // ListTaskWorkers describes the number of workers concurrently fetching + // tasks from Kapacitor. This constant was chosen after some benchmarking + // work and should likely work well for quad-core systems + ListTaskWorkers = 4 + + // TaskGatherers is the number of workers collating responses from + // ListTaskWorkers. There can only be one without additional synchronization + // around the output buffer from ListTasks + TaskGatherers = 1 +) // ensure PaginatingKapaClient is a KapaClient var _ KapaClient = &PaginatingKapaClient{} @@ -43,18 +53,18 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done) } - var taskAsm sync.WaitGroup - taskAsm.Add(1) + var gatherWg sync.WaitGroup + gatherWg.Add(TaskGatherers) go func() { for task := range taskChan { allTasks = append(allTasks, task...) } - taskAsm.Done() + gatherWg.Done() }() wg.Wait() close(taskChan) - taskAsm.Wait() + gatherWg.Wait() return allTasks, nil } From da55f96f9d2a130fc2c449cbf566f76be0caf668 Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Wed, 23 Aug 2017 16:34:50 -0400 Subject: [PATCH 09/10] Remove kapacitor mock logic This logic was originally left in place to help future test writers, but its presence was vexing because it was not exercised in existing test cases. It has been commented out should future tests need to leverage it. --- server/kapacitors_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/kapacitors_test.go b/server/kapacitors_test.go index 6d4ba76e7..8aa7328de 100644 --- a/server/kapacitors_test.go +++ b/server/kapacitors_test.go @@ -142,11 +142,12 @@ func Test_KapacitorRulesGet(t *testing.T) { tasks = map[string]interface{}{ "tasks": tsks[offset:], } - } else { - tasks = map[string]interface{}{ - "tasks": tsks[offset : offset+limit], - } } + //} else { + //tasks = map[string]interface{}{ + //"tasks": tsks[offset : offset+limit], + //} + //} err = json.NewEncoder(rw).Encode(&tasks) if err != nil { From e9fda552b4f9eb1389adc6c1803722441b19c9db Mon Sep 17 00:00:00 2001 From: Tim Raymond Date: Wed, 23 Aug 2017 16:42:51 -0400 Subject: [PATCH 10/10] [ci skip] Update CHANGELOG Adds note on kapacitor alert rule pagination no longer being limited to 100 alert rules. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fd514d39..7102719db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ 1. [#1870](https://github.com/influxdata/chronograf/pull/1870): Fix console error for placing prop on div 1. [#1864](https://github.com/influxdata/chronograf/pull/1864): Fix Write Data form upload button and add `onDragExit` handler 1. [#1866](https://github.com/influxdata/chronograf/pull/1866): Fix missing cell type (and consequently single-stat) +1. [#1886](https://github.com/influxdata/chronograf/pull/1886): Fix limit of 100 alert rules on alert rules page ### Features