Merge pull request #5869 from influxdata/5460/list_rules_params

feat(server): optimize Alert Rules API
pull/5873/head
Pavel Závora 2022-02-22 15:14:07 +01:00 committed by GitHub
commit 72ee515cfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 297 additions and 69 deletions

View File

@ -5,6 +5,7 @@
1. [#5852](https://github.com/influxdata/chronograf/pull/5852): Add Flux Query Builder.
1. [#5858](https://github.com/influxdata/chronograf/pull/5858): Use time range in flux Schema Explorer.
1. [#5868](https://github.com/influxdata/chronograf/pull/5868): Move Flux Tasks to own page.
1. [#5869](https://github.com/influxdata/chronograf/pull/5869): Optimize Alert Rules API.
### Bug Fixes

View File

@ -4,8 +4,6 @@ import (
"context"
"fmt"
"net/http"
"regexp"
"strings"
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/id"
@ -68,10 +66,13 @@ type Task struct {
TICKScript chronograf.TICKScript // TICKScript is the running script
}
var reTaskName = regexp.MustCompile(`[\r\n]*var[ \t]+name[ \t]+=[ \t]+'([^\n]+)'[ \r\t]*\n`)
// NewTask creates a task from a kapacitor client task
func NewTask(task *client.Task) *Task {
return NewTaskWithParsing(task, true)
}
// NewTask creates a task from a kapacitor client task with optional parsing of alert rule
func NewTaskWithParsing(task *client.Task, parse bool) *Task {
dbrps := make([]chronograf.DBRP, len(task.DBRPs))
for i := range task.DBRPs {
dbrps[i].DB = task.DBRPs[i].Database
@ -80,26 +81,20 @@ func NewTask(task *client.Task) *Task {
script := chronograf.TICKScript(task.TICKscript)
var rule chronograf.AlertRule = chronograf.AlertRule{Query: nil}
if task.TemplateID == "" {
if parse && task.TemplateID == "" {
// try to parse chronograf rule, tasks created from template cannot be chronograf rules
if parsedRule, err := Reverse(script); err == nil {
rule = parsedRule
// #5403 override name when defined in a variable
if nameVar, exists := task.Vars["name"]; exists {
if val, isString := nameVar.Value.(string); isString && val != "" {
rule.Name = val
}
}
}
}
if rule.Name == "" {
// try to parse Name from a line such as: `var name = 'Rule Name'
if matches := reTaskName.FindStringSubmatch(task.TICKscript); matches != nil {
rule.Name = strings.ReplaceAll(strings.ReplaceAll(matches[1], "\\'", "'"), "\\\\", "\\")
} else {
rule.Name = task.ID
}
}
// #5403 override name when defined in a variable
if nameVar, exists := task.Vars["name"]; exists {
if val, isString := nameVar.Value.(string); isString && val != "" {
rule.Name = val
}
rule.Name = GetAlertRuleName(task)
}
rule.Vars = task.Vars
@ -279,13 +274,16 @@ func (c *Client) status(ctx context.Context, href string) (client.TaskStatus, er
// All returns all tasks in kapacitor
func (c *Client) All(ctx context.Context) (map[string]*Task, error) {
return c.List(ctx, &client.ListTasksOptions{}, true)
}
// List kapacitor tasks according to options supplied
func (c *Client) List(ctx context.Context, opts *client.ListTasksOptions, parseRules bool) (map[string]*Task, error) {
kapa, err := c.kapaClient(c.URL, c.Username, c.Password, c.InsecureSkipVerify)
if err != nil {
return nil, err
}
// Only get the status, id and link section back
opts := &client.ListTasksOptions{}
tasks, err := kapa.ListTasks(opts)
if err != nil {
return nil, err
@ -293,7 +291,7 @@ func (c *Client) All(ctx context.Context) (map[string]*Task, error) {
all := map[string]*Task{}
for _, task := range tasks {
all[task.ID] = NewTask(&task)
all[task.ID] = NewTaskWithParsing(&task, parseRules)
}
return all, nil
}
@ -325,7 +323,6 @@ func (c *Client) Get(ctx context.Context, id string) (*Task, error) {
if err != nil {
return nil, chronograf.ErrAlertNotFound
}
fmt.Println("!!!", task.ID, task.TemplateID)
return NewTask(&task), nil
}

View File

@ -1,6 +1,9 @@
package kapacitor
import (
"errors"
"regexp"
"strings"
"sync"
client "github.com/influxdata/kapacitor/client/v1"
@ -11,13 +14,10 @@ const (
// tasks from Kapacitor. This constant was chosen after some benchmarking
// work and should likely work well for quad-core systems
ListTaskWorkers = 4
// TaskGatherers is the number of workers collating responses from
// ListTaskWorkers. There can only be one without additional synchronization
// around the output buffer from ListTasks
TaskGatherers = 1
)
const maxLimit = 1_000_000
// ensure PaginatingKapaClient is a KapaClient
var _ KapaClient = &PaginatingKapaClient{}
@ -31,18 +31,21 @@ type PaginatingKapaClient struct {
// ListTasks lists all available tasks from Kapacitor, navigating pagination as
// it fetches them
func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) {
// only trigger auto-pagination with Offset=0 and Limit=0
if opts.Limit != 0 || opts.Offset != 0 {
return p.KapaClient.ListTasks(opts)
if opts.Pattern != "" && opts.Offset > 0 {
return nil, errors.New("offset paramater must be 0 when pattern parameter is supplied")
}
allTasks := []client.Task{}
allTasks := make([]client.Task, 0, p.FetchRate)
optChan := make(chan client.ListTasksOptions)
taskChan := make(chan []client.Task, ListTaskWorkers)
done := make(chan struct{})
var once sync.Once
readFinished := func() {
once.Do(func() {
close(done)
})
}
go p.generateKapacitorOptions(optChan, *opts, done)
@ -50,14 +53,41 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go p.fetchFromKapacitor(optChan, &wg, &once, taskChan, done)
go func() {
p.fetchFromKapacitor(optChan, taskChan, readFinished)
wg.Done()
}()
}
var gatherWg sync.WaitGroup
gatherWg.Add(TaskGatherers)
gatherWg.Add(1)
go func() {
for task := range taskChan {
allTasks = append(allTasks, task...)
// pos and limit are position within filtered results
// that are used with pattern searching
pos := -1
limit := opts.Limit
if limit <= 0 {
limit = maxLimit
}
processTasksBatches:
for tasks := range taskChan {
if opts.Pattern != "" {
for _, task := range tasks {
if strings.Contains(GetAlertRuleName(&task), opts.Pattern) {
pos++
if pos < opts.Offset {
continue
}
allTasks = append(allTasks, task)
if len(allTasks) == limit {
readFinished()
break processTasksBatches
}
}
}
continue
}
allTasks = append(allTasks, tasks...)
}
gatherWg.Done()
}()
@ -72,8 +102,7 @@ func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]clien
// fetchFromKapacitor fetches a set of results from a kapacitor by reading a
// set of options from the provided optChan. Fetched tasks are pushed onto the
// provided taskChan
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, wg *sync.WaitGroup, closer *sync.Once, taskChan chan []client.Task, done chan struct{}) {
defer wg.Done()
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, taskChan chan []client.Task, readFinished func()) {
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
@ -82,9 +111,7 @@ func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksO
// break and stop all workers if we're done
if len(resp) == 0 {
closer.Do(func() {
close(done)
})
readFinished()
return
}
@ -96,18 +123,64 @@ func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksO
// generateKapacitorOptions creates ListTasksOptions with incrementally greater
// Limit and Offset parameters, and inserts them into the provided optChan
func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) {
// ensure Limit and Offset start from known quantities
opts.Limit = p.FetchRate
opts.Offset = 0
toFetchCount := opts.Limit
if toFetchCount <= 0 {
toFetchCount = maxLimit
}
// fetch all data when pattern is set, chronograf is herein filters by task name
// whereas kapacitor filters by task ID that is hidden to chronograf users
if opts.Pattern != "" {
toFetchCount = maxLimit
opts.Pattern = ""
opts.Offset = 0
}
nextLimit := func() int {
if p.FetchRate < toFetchCount {
toFetchCount -= p.FetchRate
return p.FetchRate
}
retVal := toFetchCount
toFetchCount = 0
return retVal
}
opts.Limit = nextLimit()
opts.Pattern = ""
generateOpts:
for {
select {
case <-done:
close(optChan)
return
break generateOpts
case optChan <- opts:
// nop
}
opts.Offset = p.FetchRate + opts.Offset
if toFetchCount <= 0 {
// no more data to read from options
break generateOpts
}
opts.Offset = opts.Offset + p.FetchRate
opts.Limit = nextLimit()
}
close(optChan)
}
var reTaskName = regexp.MustCompile(`[\r\n]*var[ \t]+name[ \t]+=[ \t]+'([^\n]+)'[ \r\t]*\n`)
// GetAlertRuleName returns chronograf rule name out of kapacitor task.
// Chronograf UI always preffer alert rule names.
func GetAlertRuleName(task *client.Task) string {
// #5403 override name when defined in a variable
if nameVar, exists := task.Vars["name"]; exists {
if val, isString := nameVar.Value.(string); isString && val != "" {
return val
}
}
// try to parse Name from a line such as: `var name = 'Rule Name'
if matches := reTaskName.FindStringSubmatch(task.TICKscript); matches != nil {
return strings.ReplaceAll(strings.ReplaceAll(matches[1], "\\'", "'"), "\\\\", "\\")
}
return task.ID
}

View File

@ -1,6 +1,7 @@
package kapacitor_test
import (
"fmt"
"testing"
"github.com/influxdata/chronograf/kapacitor"
@ -17,7 +18,9 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) {
// create all the tasks
allTasks := []client.Task{}
for i := 0; i < lenAllTasks; i++ {
allTasks = append(allTasks, client.Task{})
allTasks = append(allTasks, client.Task{
ID: "id" + fmt.Sprintf("%3d", i),
})
}
begin := opts.Offset
end := opts.Offset + opts.Limit
@ -39,23 +42,144 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) {
FetchRate: 50,
}
opts := &client.ListTasksOptions{
Limit: 100,
Offset: 0,
t.Run("100 tasks returned when calling with limit", func(t *testing.T) {
opts := &client.ListTasksOptions{
Limit: 100,
Offset: 0,
}
// ensure 100 elems returned when calling mockClient directly
tasks, _ := pkap.ListTasks(opts)
if len(tasks) != 100 {
t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks))
}
})
t.Run("all tasks with 0 Limit", func(t *testing.T) {
opts := &client.ListTasksOptions{
Limit: 0,
Offset: 0,
}
tasks, _ := pkap.ListTasks(opts)
if len(tasks) != lenAllTasks {
t.Error("PaginatingKapaClient: Expected to find", lenAllTasks, "tasks but found", len(tasks))
}
opts.Offset = 100
tasks, _ = pkap.ListTasks(opts)
if len(tasks) != lenAllTasks-100 {
t.Error("PaginatingKapaClient: Expected to find", lenAllTasks-100, "tasks but found", len(tasks))
}
})
t.Run("tasks matching pattern", func(t *testing.T) {
opts := &client.ListTasksOptions{
Pattern: " ",
}
tasks, _ := pkap.ListTasks(opts)
if len(tasks) != 100 {
t.Error("PaginatingKapaClient: Expected to find 100 tasks but found", len(tasks))
}
opts = &client.ListTasksOptions{
Pattern: " ",
Limit: 20,
}
tasks, _ = pkap.ListTasks(opts)
if len(tasks) != 20 {
t.Error("PaginatingKapaClient: Expected to find 100 tasks but found", len(tasks))
}
opts = &client.ListTasksOptions{
Pattern: "id22",
Limit: 10,
}
tasks, _ = pkap.ListTasks(opts)
if len(tasks) != 7 {
t.Error("PaginatingKapaClient: Expected to find 7 matching task but found: ", len(tasks))
}
opts = &client.ListTasksOptions{
Pattern: "id22",
Limit: 1,
}
tasks, _ = pkap.ListTasks(opts)
if len(tasks) != 1 {
t.Error("PaginatingKapaClient: Expected to find 1 matching task but found: ", len(tasks))
}
opts = &client.ListTasksOptions{
Pattern: "id227",
}
tasks, _ = pkap.ListTasks(opts)
if len(tasks) != 0 {
t.Error("PaginatingKapaClient: Expected to find no matching task but found: ", len(tasks))
}
})
t.Run("zero offset required with pattern specified", func(t *testing.T) {
opts := &client.ListTasksOptions{
Pattern: " ",
Offset: 1,
}
_, err := pkap.ListTasks(opts)
if err == nil {
t.Error("PaginatingKapaClient: Error expected but no error returned")
}
})
}
func Test_Kapacitor_GetAlertRuleName(t *testing.T) {
tests := []struct {
Task client.Task
Name string
}{
{
Task: client.Task{},
Name: "",
},
{
Task: client.Task{
ID: "abcd",
},
Name: "abcd",
},
{
Task: client.Task{
ID: "abcd",
TICKscript: "var name = 'pavel'\n",
},
Name: "pavel",
},
{
Task: client.Task{
ID: "abcd",
TICKscript: "var name = 'pavel'\n",
Vars: client.Vars{
"name": {
Type: client.VarInt,
Value: 1,
},
},
},
Name: "pavel",
},
{
Task: client.Task{
ID: "abcd",
TICKscript: "var name = 'pavel'\n",
Vars: client.Vars{
"name": {
Type: client.VarString,
Value: "pepa",
},
},
},
Name: "pepa",
},
}
// ensure 100 elems returned when calling mockClient directly
tasks, _ := pkap.ListTasks(opts)
if len(tasks) != 100 {
t.Error("Expected calling KapaClient's ListTasks to return", opts.Limit, "items. Received:", len(tasks))
}
// ensure PaginatingKapaClient returns _all_ tasks with 0 value for Limit and Offset
allOpts := &client.ListTasksOptions{}
allTasks, _ := pkap.ListTasks(allOpts)
if len(allTasks) != lenAllTasks {
t.Error("PaginatingKapaClient: Expected to find", lenAllTasks, "tasks but found", len(allTasks))
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
retVal := kapacitor.GetAlertRuleName(&test.Task)
if retVal != test.Name {
t.Error("Expected: ", test.Name, " Received:", retVal)
}
})
}
}

View File

@ -7,11 +7,13 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/bouk/httprouter"
"github.com/influxdata/chronograf"
kapa "github.com/influxdata/chronograf/kapacitor"
"github.com/influxdata/kapacitor/client/v1"
)
type postKapacitorRequest struct {
@ -38,7 +40,7 @@ func (p *postKapacitorRequest) Valid(defaultOrgID string) error {
return fmt.Errorf("invalid source URI: %v", err)
}
if len(url.Scheme) == 0 {
return fmt.Errorf("Invalid URL; no URL scheme defined")
return fmt.Errorf("invalid URL; no URL scheme defined")
}
return nil
@ -248,7 +250,7 @@ func (p *patchKapacitorRequest) Valid() error {
return fmt.Errorf("invalid source URI: %v", err)
}
if len(url.Scheme) == 0 {
return fmt.Errorf("Invalid URL; no URL scheme defined")
return fmt.Errorf("invalid URL; no URL scheme defined")
}
}
return nil
@ -659,7 +661,7 @@ func (k *KapacitorStatus) Valid() error {
if k.Status == "enabled" || k.Status == "disabled" {
return nil
}
return fmt.Errorf("Invalid Kapacitor status: %s", k.Status)
return fmt.Errorf("invalid Kapacitor status: %s", k.Status)
}
// KapacitorRulesStatus proxies PATCH to kapacitor to enable/disable tasks
@ -730,6 +732,16 @@ func (s *Service) KapacitorRulesGet(w http.ResponseWriter, r *http.Request) {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
return
}
// parse parameters
params := r.URL.Query()
opts := client.ListTasksOptions{}
if _limit, err := strconv.ParseInt(params.Get("limit"), 0, 0); err == nil {
opts.Limit = int(_limit)
}
if _offset, err := strconv.ParseInt(params.Get("offset"), 0, 0); err == nil {
opts.Offset = int(_offset)
}
opts.Pattern = params.Get("pattern")
srcID, err := paramID("id", r)
if err != nil {
@ -745,7 +757,7 @@ func (s *Service) KapacitorRulesGet(w http.ResponseWriter, r *http.Request) {
}
c := kapa.NewClient(srv.URL, srv.Username, srv.Password, srv.InsecureSkipVerify)
tasks, err := c.All(ctx)
tasks, err := c.List(ctx, &opts, params.Get("parse") != "0")
if err != nil {
Error(w, http.StatusInternalServerError, err.Error(), s.Logger)
return

View File

@ -1476,6 +1476,27 @@
"type": "string",
"description": "ID of the kapacitor backend.",
"required": true
},
{
"name": "pattern",
"in": "query",
"type": "string",
"description": "filter results to contain the specified value in the task name",
"required": false
},
{
"name": "limit",
"in": "query",
"type": "number",
"description": "limits results length, 0 to return unlimited results",
"required": false
},
{
"name": "parse",
"in": "query",
"type": "string",
"description": "can be '0' to skip parsing of TICKscript",
"required": false
}
],
"responses": {