diff --git a/CHANGELOG.md b/CHANGELOG.md index d9d9d1f25..62a8da107 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ 1. [#1715](https://github.com/influxdata/chronograf/pull/1715): Chronograf now renders on IE11. 1. [#1870](https://github.com/influxdata/chronograf/pull/1870): Fix console error for placing prop on div 1. [#1864](https://github.com/influxdata/chronograf/pull/1864): Fix Write Data form upload button and add `onDragExit` handler +1. [#1866](https://github.com/influxdata/chronograf/pull/1866): Fix missing cell type (and consequently single-stat) +1. [#1886](https://github.com/influxdata/chronograf/pull/1886): Fix limit of 100 alert rules on alert rules page + 1. [#1891](https://github.com/influxdata/chronograf/pull/1891): Fix Kapacitor config for PagerDuty via the UI 1. [#1897](https://github.com/influxdata/chronograf/pull/1897): Fix regression from [#1864](https://github.com/influxdata/chronograf/pull/1864) and redesign drag & drop interaction 1. [#1872](https://github.com/influxdata/chronograf/pull/1872): Prevent stats in the legend from wrapping line diff --git a/kapacitor/client.go b/kapacitor/client.go index 3f516d8ee..b7687f3d9 100644 --- a/kapacitor/client.go +++ b/kapacitor/client.go @@ -12,6 +12,9 @@ import ( const ( // Prefix is prepended to the ID of all alerts Prefix = "chronograf-v1-" + + // FetchRate is the rate Paginating Kapacitor Clients will consume responses + FetchRate = 100 ) // Client communicates to kapacitor @@ -325,8 +328,14 @@ func NewKapaClient(url, username, password string) (KapaClient, error) { } } - return client.New(client.Config{ + clnt, err := client.New(client.Config{ URL: url, Credentials: creds, }) + + if err != nil { + return clnt, err + } + + return &PaginatingKapaClient{clnt, FetchRate}, nil } diff --git a/kapacitor/kapa_client.go b/kapacitor/kapa_client.go new file mode 100644 index 000000000..7d5db8a29 --- /dev/null +++ b/kapacitor/kapa_client.go @@ -0,0 +1,113 @@ +package kapacitor + +import ( + "sync" + + client "github.com/influxdata/kapacitor/client/v1" +) + +const ( + // ListTaskWorkers describes the number of workers concurrently fetching + // tasks from Kapacitor. This constant was chosen after some benchmarking + // work and should likely work well for quad-core systems + ListTaskWorkers = 4 + + // TaskGatherers is the number of workers collating responses from + // ListTaskWorkers. There can only be one without additional synchronization + // around the output buffer from ListTasks + TaskGatherers = 1 +) + +// ensure PaginatingKapaClient is a KapaClient +var _ KapaClient = &PaginatingKapaClient{} + +// PaginatingKapaClient is a Kapacitor client that automatically navigates +// through Kapacitor's pagination to fetch all results +type PaginatingKapaClient struct { + KapaClient + FetchRate int // specifies the number of elements to fetch from Kapacitor at a time +} + +// ListTasks lists all available tasks from Kapacitor, navigating pagination as +// it fetches them +func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) { + // only trigger auto-pagination with Offset=0 and Limit=0 + if opts.Limit != 0 || opts.Offset != 0 { + return p.KapaClient.ListTasks(opts) + } + + allTasks := []client.Task{} + + optChan := make(chan client.ListTasksOptions) + taskChan := make(chan []client.Task, ListTaskWorkers) + done := make(chan struct{}) + + var once sync.Once + + go p.generateKapacitorOptions(optChan, *opts, done) + + var wg sync.WaitGroup + + wg.Add(ListTaskWorkers) + for i := 0; i < ListTaskWorkers; i++ { + go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done) + } + + var gatherWg sync.WaitGroup + gatherWg.Add(TaskGatherers) + go func() { + for task := range taskChan { + allTasks = append(allTasks, task...) + } + gatherWg.Done() + }() + + wg.Wait() + close(taskChan) + gatherWg.Wait() + + return allTasks, nil +} + +// fetchFromKapacitor fetches a set of results from a kapacitor by reading a +// set of options from the provided optChan. Fetched tasks are pushed onto the +// provided taskChan +func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) { + defer wg.Done() + for opt := range optChan { + resp, err := p.KapaClient.ListTasks(&opt) + if err != nil { + return + } + + // break and stop all workers if we're done + if len(resp) == 0 { + closer.Do(func() { + close(done) + }) + return + } + + // handoff tasks to consumer + taskChan <- resp + } +} + +// generateKapacitorOptions creates ListTasksOptions with incrementally greater +// Limit and Offset parameters, and inserts them into the provided optChan +func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) { + // ensure Limit and Offset start from known quantities + opts.Limit = p.FetchRate + opts.Offset = 0 + + for { + select { + case <-done: + close(optChan) + return + case optChan <- opts: + // nop + } + opts.Offset = p.FetchRate + opts.Offset + } +} diff --git a/kapacitor/kapa_client_benchmark_test.go b/kapacitor/kapa_client_benchmark_test.go new file mode 100644 index 000000000..bca4cbebb --- /dev/null +++ b/kapacitor/kapa_client_benchmark_test.go @@ -0,0 +1,54 @@ +package kapacitor_test + +import ( + "testing" + + "github.com/influxdata/chronograf/kapacitor" + "github.com/influxdata/chronograf/mocks" + client "github.com/influxdata/kapacitor/client/v1" +) + +func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) } +func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) } +func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) } +func BenchmarkKapaClient100000(b *testing.B) { benchmark_PaginatingKapaClient(100000, b) } + +var tasks []client.Task + +func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) { + + b.StopTimer() // eliminate setup time + + // create a mock client that will return a huge response from ListTasks + mockClient := &mocks.KapaClient{ + ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { + // create all the tasks + allTasks := make([]client.Task, taskCount) + + begin := opts.Offset + end := opts.Offset + opts.Limit + + if end > len(allTasks) { + end = len(allTasks) + } + + if begin > len(allTasks) { + begin = end + } + + return allTasks[begin:end], nil + }, + } + + pkap := kapacitor.PaginatingKapaClient{mockClient, 50} + + opts := &client.ListTasksOptions{} + + b.StartTimer() // eliminate setup time + + // let the benchmark runner run ListTasks until it's satisfied + for n := 0; n < b.N; n++ { + // assignment is to avoid having the call optimized away + tasks, _ = pkap.ListTasks(opts) + } +} diff --git a/kapacitor/kapa_client_test.go b/kapacitor/kapa_client_test.go new file mode 100644 index 000000000..9dda521e5 --- /dev/null +++ b/kapacitor/kapa_client_test.go @@ -0,0 +1,58 @@ +package kapacitor_test + +import ( + "testing" + + "github.com/influxdata/chronograf/kapacitor" + "github.com/influxdata/chronograf/mocks" + client "github.com/influxdata/kapacitor/client/v1" +) + +func Test_Kapacitor_PaginatingKapaClient(t *testing.T) { + const lenAllTasks = 227 // prime, to stress odd result sets + + // create a mock client that will return a huge response from ListTasks + mockClient := &mocks.KapaClient{ + ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) { + // create all the tasks + allTasks := []client.Task{} + for i := 0; i < lenAllTasks; i++ { + allTasks = append(allTasks, client.Task{}) + } + begin := opts.Offset + end := opts.Offset + opts.Limit + + if end > len(allTasks) { + end = len(allTasks) + } + + if begin > len(allTasks) { + begin = end + } + + return allTasks[begin:end], nil + }, + } + + pkap := kapacitor.PaginatingKapaClient{mockClient, 50} + + opts := &client.ListTasksOptions{ + Limit: 100, + Offset: 0, + } + + // ensure 100 elems returned when calling mockClient directly + tasks, _ := pkap.ListTasks(opts) + + if len(tasks) != 100 { + t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks)) + } + + // ensure PaginatingKapaClient returns _all_ tasks with 0 value for Limit and Offset + allOpts := &client.ListTasksOptions{} + allTasks, _ := pkap.ListTasks(allOpts) + + if len(allTasks) != lenAllTasks { + t.Error("PaginatingKapaClient: Expected to find", lenAllTasks, "tasks but found", len(allTasks)) + } +} diff --git a/mocks/kapacitor_client.go b/mocks/kapacitor_client.go new file mode 100644 index 000000000..2d5e218bf --- /dev/null +++ b/mocks/kapacitor_client.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "github.com/influxdata/chronograf/kapacitor" + client "github.com/influxdata/kapacitor/client/v1" +) + +var _ kapacitor.KapaClient = &KapaClient{} + +// Client is a mock Kapacitor client +type KapaClient struct { + CreateTaskF func(opts client.CreateTaskOptions) (client.Task, error) + DeleteTaskF func(link client.Link) error + ListTasksF func(opts *client.ListTasksOptions) ([]client.Task, error) + TaskF func(link client.Link, opts *client.TaskOptions) (client.Task, error) + UpdateTaskF func(link client.Link, opts client.UpdateTaskOptions) (client.Task, error) +} + +func (p *KapaClient) CreateTask(opts client.CreateTaskOptions) (client.Task, error) { + return p.CreateTaskF(opts) +} + +func (p *KapaClient) DeleteTask(link client.Link) error { + return p.DeleteTaskF(link) +} + +func (p *KapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) { + return p.ListTasksF(opts) +} + +func (p *KapaClient) Task(link client.Link, opts *client.TaskOptions) (client.Task, error) { + return p.TaskF(link, opts) +} + +func (p *KapaClient) UpdateTask(link client.Link, opts client.UpdateTaskOptions) (client.Task, error) { + return p.UpdateTaskF(link, opts) +} diff --git a/server/kapacitors_test.go b/server/kapacitors_test.go index 116aca560..8aa7328de 100644 --- a/server/kapacitors_test.go +++ b/server/kapacitors_test.go @@ -1,11 +1,30 @@ -package server +package server_test import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" + "github.com/bouk/httprouter" + "github.com/google/go-cmp/cmp" "github.com/influxdata/chronograf" + "github.com/influxdata/chronograf/mocks" + "github.com/influxdata/chronograf/server" ) +const tickScript = ` +stream + |from() + .measurement('cpu') + |alert() + .crit(lambda: "usage_idle" < 10) + .log('/tmp/alert') +` + func TestValidRuleRequest(t *testing.T) { tests := []struct { name string @@ -48,9 +67,161 @@ func TestValidRuleRequest(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr { + if err := server.ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr { t.Errorf("ValidRuleRequest() error = %v, wantErr %v", err, tt.wantErr) } }) } } + +func Test_KapacitorRulesGet(t *testing.T) { + kapaTests := []struct { + name string + requestPath string + mockAlerts []chronograf.AlertRule + expected []chronograf.AlertRule + }{ + { + "basic", + "/chronograf/v1/sources/1/kapacitors/1/rules", + []chronograf.AlertRule{ + { + ID: "cpu_alert", + Name: "cpu_alert", + }, + }, + []chronograf.AlertRule{ + { + ID: "cpu_alert", + Name: "cpu_alert", + Alerts: []string{}, + }, + }, + }, + } + + for _, test := range kapaTests { + test := test // needed to avoid data race + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // setup mock kapa API + kapaSrv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + limit, err := strconv.Atoi(params.Get("limit")) + if err != nil { + rw.WriteHeader(http.StatusBadRequest) + return + } + offset, err := strconv.Atoi(params.Get("offset")) + if err != nil { + rw.WriteHeader(http.StatusBadRequest) + return + } + + tsks := []map[string]interface{}{} + for _, task := range test.mockAlerts { + tsks = append(tsks, map[string]interface{}{ + "id": task.ID, + "script": tickScript, + "status": "enabled", + "link": map[string]interface{}{ + "rel": "self", + "href": "/kapacitor/v1/tasks/cpu_alert", + }, + }) + } + + var tasks map[string]interface{} + + if offset >= len(tsks) { + tasks = map[string]interface{}{ + "tasks": []map[string]interface{}{}, + } + } else if limit+offset > len(tsks) { + tasks = map[string]interface{}{ + "tasks": tsks[offset:], + } + } + //} else { + //tasks = map[string]interface{}{ + //"tasks": tsks[offset : offset+limit], + //} + //} + + err = json.NewEncoder(rw).Encode(&tasks) + if err != nil { + t.Error("Failed to encode JSON. err:", err) + } + })) + defer kapaSrv.Close() + + // setup mock service and test logger + testLogger := mocks.TestLogger{} + svc := &server.Service{ + ServersStore: &mocks.ServersStore{ + GetF: func(ctx context.Context, ID int) (chronograf.Server, error) { + return chronograf.Server{ + SrcID: ID, + URL: kapaSrv.URL, + }, nil + }, + }, + Logger: &testLogger, + } + + // setup request and response recorder + req := httptest.NewRequest("GET", test.requestPath, strings.NewReader("")) + rr := httptest.NewRecorder() + + // setup context and request params + bg := context.Background() + params := httprouter.Params{ + { + "id", + "1", + }, + { + "kid", + "1", + }, + } + ctx := httprouter.WithParams(bg, params) + req = req.WithContext(ctx) + + // invoke KapacitorRulesGet endpoint + svc.KapacitorRulesGet(rr, req) + + // destructure response + frame := struct { + Rules []struct { + chronograf.AlertRule + TICKScript json.RawMessage `json:"tickscript"` + Status json.RawMessage `json:"status"` + Links json.RawMessage `json:"links"` + } `json:"rules"` + }{} + + resp := rr.Result() + + err := json.NewDecoder(resp.Body).Decode(&frame) + if err != nil { + t.Fatal("Err decoding kapa rule response: err:", err) + } + + actual := make([]chronograf.AlertRule, len(frame.Rules)) + + for idx, _ := range frame.Rules { + actual[idx] = frame.Rules[idx].AlertRule + } + + if resp.StatusCode != http.StatusOK { + t.Fatal("Expected HTTP 200 OK but got", resp.Status) + } + + if !cmp.Equal(test.expected, actual) { + t.Fatalf("%q - Alert rules differ! diff:\n%s\n", test.name, cmp.Diff(test.expected, actual)) + } + }) + } +}