Merge pull request #1886 from influxdata/feature/tr-kapa-rule-pagination

Pagination when fetching Kapacitor Rules
pull/1933/head
Timothy J. Raymond 2017-08-23 17:00:20 -04:00 committed by GitHub
commit ece8cf3fed
7 changed files with 448 additions and 3 deletions

View File

@ -4,6 +4,9 @@
1. [#1715](https://github.com/influxdata/chronograf/pull/1715): Chronograf now renders on IE11.
1. [#1870](https://github.com/influxdata/chronograf/pull/1870): Fix console error for placing prop on div
1. [#1864](https://github.com/influxdata/chronograf/pull/1864): Fix Write Data form upload button and add `onDragExit` handler
1. [#1866](https://github.com/influxdata/chronograf/pull/1866): Fix missing cell type (and consequently single-stat)
1. [#1886](https://github.com/influxdata/chronograf/pull/1886): Fix limit of 100 alert rules on alert rules page
1. [#1891](https://github.com/influxdata/chronograf/pull/1891): Fix Kapacitor config for PagerDuty via the UI
1. [#1897](https://github.com/influxdata/chronograf/pull/1897): Fix regression from [#1864](https://github.com/influxdata/chronograf/pull/1864) and redesign drag & drop interaction
1. [#1872](https://github.com/influxdata/chronograf/pull/1872): Prevent stats in the legend from wrapping line

View File

@ -12,6 +12,9 @@ import (
const (
// Prefix is prepended to the ID of all alerts
Prefix = "chronograf-v1-"
// FetchRate is the rate Paginating Kapacitor Clients will consume responses
FetchRate = 100
)
// Client communicates to kapacitor
@ -325,8 +328,14 @@ func NewKapaClient(url, username, password string) (KapaClient, error) {
}
}
return client.New(client.Config{
clnt, err := client.New(client.Config{
URL: url,
Credentials: creds,
})
if err != nil {
return clnt, err
}
return &PaginatingKapaClient{clnt, FetchRate}, nil
}

113
kapacitor/kapa_client.go Normal file
View File

@ -0,0 +1,113 @@
package kapacitor
import (
"sync"
client "github.com/influxdata/kapacitor/client/v1"
)
const (
// ListTaskWorkers describes the number of workers concurrently fetching
// tasks from Kapacitor. This constant was chosen after some benchmarking
// work and should likely work well for quad-core systems
ListTaskWorkers = 4
// TaskGatherers is the number of workers collating responses from
// ListTaskWorkers. There can only be one without additional synchronization
// around the output buffer from ListTasks
TaskGatherers = 1
)
// ensure PaginatingKapaClient is a KapaClient
var _ KapaClient = &PaginatingKapaClient{}
// PaginatingKapaClient is a Kapacitor client that automatically navigates
// through Kapacitor's pagination to fetch all results
type PaginatingKapaClient struct {
KapaClient
FetchRate int // specifies the number of elements to fetch from Kapacitor at a time
}
// ListTasks lists all available tasks from Kapacitor, navigating pagination as
// it fetches them
func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) {
// only trigger auto-pagination with Offset=0 and Limit=0
if opts.Limit != 0 || opts.Offset != 0 {
return p.KapaClient.ListTasks(opts)
}
allTasks := []client.Task{}
optChan := make(chan client.ListTasksOptions)
taskChan := make(chan []client.Task, ListTaskWorkers)
done := make(chan struct{})
var once sync.Once
go p.generateKapacitorOptions(optChan, *opts, done)
var wg sync.WaitGroup
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done)
}
var gatherWg sync.WaitGroup
gatherWg.Add(TaskGatherers)
go func() {
for task := range taskChan {
allTasks = append(allTasks, task...)
}
gatherWg.Done()
}()
wg.Wait()
close(taskChan)
gatherWg.Wait()
return allTasks, nil
}
// fetchFromKapacitor fetches a set of results from a kapacitor by reading a
// set of options from the provided optChan. Fetched tasks are pushed onto the
// provided taskChan
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) {
defer wg.Done()
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
closer.Do(func() {
close(done)
})
return
}
// handoff tasks to consumer
taskChan <- resp
}
}
// generateKapacitorOptions creates ListTasksOptions with incrementally greater
// Limit and Offset parameters, and inserts them into the provided optChan
func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) {
// ensure Limit and Offset start from known quantities
opts.Limit = p.FetchRate
opts.Offset = 0
for {
select {
case <-done:
close(optChan)
return
case optChan <- opts:
// nop
}
opts.Offset = p.FetchRate + opts.Offset
}
}

View File

@ -0,0 +1,54 @@
package kapacitor_test
import (
"testing"
"github.com/influxdata/chronograf/kapacitor"
"github.com/influxdata/chronograf/mocks"
client "github.com/influxdata/kapacitor/client/v1"
)
func BenchmarkKapaClient100(b *testing.B) { benchmark_PaginatingKapaClient(100, b) }
func BenchmarkKapaClient1000(b *testing.B) { benchmark_PaginatingKapaClient(1000, b) }
func BenchmarkKapaClient10000(b *testing.B) { benchmark_PaginatingKapaClient(10000, b) }
func BenchmarkKapaClient100000(b *testing.B) { benchmark_PaginatingKapaClient(100000, b) }
var tasks []client.Task
func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) {
b.StopTimer() // eliminate setup time
// create a mock client that will return a huge response from ListTasks
mockClient := &mocks.KapaClient{
ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) {
// create all the tasks
allTasks := make([]client.Task, taskCount)
begin := opts.Offset
end := opts.Offset + opts.Limit
if end > len(allTasks) {
end = len(allTasks)
}
if begin > len(allTasks) {
begin = end
}
return allTasks[begin:end], nil
},
}
pkap := kapacitor.PaginatingKapaClient{mockClient, 50}
opts := &client.ListTasksOptions{}
b.StartTimer() // eliminate setup time
// let the benchmark runner run ListTasks until it's satisfied
for n := 0; n < b.N; n++ {
// assignment is to avoid having the call optimized away
tasks, _ = pkap.ListTasks(opts)
}
}

View File

@ -0,0 +1,58 @@
package kapacitor_test
import (
"testing"
"github.com/influxdata/chronograf/kapacitor"
"github.com/influxdata/chronograf/mocks"
client "github.com/influxdata/kapacitor/client/v1"
)
func Test_Kapacitor_PaginatingKapaClient(t *testing.T) {
const lenAllTasks = 227 // prime, to stress odd result sets
// create a mock client that will return a huge response from ListTasks
mockClient := &mocks.KapaClient{
ListTasksF: func(opts *client.ListTasksOptions) ([]client.Task, error) {
// create all the tasks
allTasks := []client.Task{}
for i := 0; i < lenAllTasks; i++ {
allTasks = append(allTasks, client.Task{})
}
begin := opts.Offset
end := opts.Offset + opts.Limit
if end > len(allTasks) {
end = len(allTasks)
}
if begin > len(allTasks) {
begin = end
}
return allTasks[begin:end], nil
},
}
pkap := kapacitor.PaginatingKapaClient{mockClient, 50}
opts := &client.ListTasksOptions{
Limit: 100,
Offset: 0,
}
// ensure 100 elems returned when calling mockClient directly
tasks, _ := pkap.ListTasks(opts)
if len(tasks) != 100 {
t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks))
}
// ensure PaginatingKapaClient returns _all_ tasks with 0 value for Limit and Offset
allOpts := &client.ListTasksOptions{}
allTasks, _ := pkap.ListTasks(allOpts)
if len(allTasks) != lenAllTasks {
t.Error("PaginatingKapaClient: Expected to find", lenAllTasks, "tasks but found", len(allTasks))
}
}

37
mocks/kapacitor_client.go Normal file
View File

@ -0,0 +1,37 @@
package mocks
import (
"github.com/influxdata/chronograf/kapacitor"
client "github.com/influxdata/kapacitor/client/v1"
)
var _ kapacitor.KapaClient = &KapaClient{}
// Client is a mock Kapacitor client
type KapaClient struct {
CreateTaskF func(opts client.CreateTaskOptions) (client.Task, error)
DeleteTaskF func(link client.Link) error
ListTasksF func(opts *client.ListTasksOptions) ([]client.Task, error)
TaskF func(link client.Link, opts *client.TaskOptions) (client.Task, error)
UpdateTaskF func(link client.Link, opts client.UpdateTaskOptions) (client.Task, error)
}
func (p *KapaClient) CreateTask(opts client.CreateTaskOptions) (client.Task, error) {
return p.CreateTaskF(opts)
}
func (p *KapaClient) DeleteTask(link client.Link) error {
return p.DeleteTaskF(link)
}
func (p *KapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) {
return p.ListTasksF(opts)
}
func (p *KapaClient) Task(link client.Link, opts *client.TaskOptions) (client.Task, error) {
return p.TaskF(link, opts)
}
func (p *KapaClient) UpdateTask(link client.Link, opts client.UpdateTaskOptions) (client.Task, error) {
return p.UpdateTaskF(link, opts)
}

View File

@ -1,11 +1,30 @@
package server
package server_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"github.com/bouk/httprouter"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/mocks"
"github.com/influxdata/chronograf/server"
)
const tickScript = `
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.log('/tmp/alert')
`
func TestValidRuleRequest(t *testing.T) {
tests := []struct {
name string
@ -48,9 +67,161 @@ func TestValidRuleRequest(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr {
if err := server.ValidRuleRequest(tt.rule); (err != nil) != tt.wantErr {
t.Errorf("ValidRuleRequest() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_KapacitorRulesGet(t *testing.T) {
kapaTests := []struct {
name string
requestPath string
mockAlerts []chronograf.AlertRule
expected []chronograf.AlertRule
}{
{
"basic",
"/chronograf/v1/sources/1/kapacitors/1/rules",
[]chronograf.AlertRule{
{
ID: "cpu_alert",
Name: "cpu_alert",
},
},
[]chronograf.AlertRule{
{
ID: "cpu_alert",
Name: "cpu_alert",
Alerts: []string{},
},
},
},
}
for _, test := range kapaTests {
test := test // needed to avoid data race
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// setup mock kapa API
kapaSrv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
limit, err := strconv.Atoi(params.Get("limit"))
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
return
}
offset, err := strconv.Atoi(params.Get("offset"))
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
return
}
tsks := []map[string]interface{}{}
for _, task := range test.mockAlerts {
tsks = append(tsks, map[string]interface{}{
"id": task.ID,
"script": tickScript,
"status": "enabled",
"link": map[string]interface{}{
"rel": "self",
"href": "/kapacitor/v1/tasks/cpu_alert",
},
})
}
var tasks map[string]interface{}
if offset >= len(tsks) {
tasks = map[string]interface{}{
"tasks": []map[string]interface{}{},
}
} else if limit+offset > len(tsks) {
tasks = map[string]interface{}{
"tasks": tsks[offset:],
}
}
//} else {
//tasks = map[string]interface{}{
//"tasks": tsks[offset : offset+limit],
//}
//}
err = json.NewEncoder(rw).Encode(&tasks)
if err != nil {
t.Error("Failed to encode JSON. err:", err)
}
}))
defer kapaSrv.Close()
// setup mock service and test logger
testLogger := mocks.TestLogger{}
svc := &server.Service{
ServersStore: &mocks.ServersStore{
GetF: func(ctx context.Context, ID int) (chronograf.Server, error) {
return chronograf.Server{
SrcID: ID,
URL: kapaSrv.URL,
}, nil
},
},
Logger: &testLogger,
}
// setup request and response recorder
req := httptest.NewRequest("GET", test.requestPath, strings.NewReader(""))
rr := httptest.NewRecorder()
// setup context and request params
bg := context.Background()
params := httprouter.Params{
{
"id",
"1",
},
{
"kid",
"1",
},
}
ctx := httprouter.WithParams(bg, params)
req = req.WithContext(ctx)
// invoke KapacitorRulesGet endpoint
svc.KapacitorRulesGet(rr, req)
// destructure response
frame := struct {
Rules []struct {
chronograf.AlertRule
TICKScript json.RawMessage `json:"tickscript"`
Status json.RawMessage `json:"status"`
Links json.RawMessage `json:"links"`
} `json:"rules"`
}{}
resp := rr.Result()
err := json.NewDecoder(resp.Body).Decode(&frame)
if err != nil {
t.Fatal("Err decoding kapa rule response: err:", err)
}
actual := make([]chronograf.AlertRule, len(frame.Rules))
for idx, _ := range frame.Rules {
actual[idx] = frame.Rules[idx].AlertRule
}
if resp.StatusCode != http.StatusOK {
t.Fatal("Expected HTTP 200 OK but got", resp.Status)
}
if !cmp.Equal(test.expected, actual) {
t.Fatalf("%q - Alert rules differ! diff:\n%s\n", test.name, cmp.Diff(test.expected, actual))
}
})
}
}