chronograf/kapacitor/kapa_client.go

187 lines
4.7 KiB
Go

package kapacitor
import (
"errors"
"regexp"
"strings"
"sync"
client "github.com/influxdata/kapacitor/client/v1"
)
const (
// ListTaskWorkers describes the number of workers concurrently fetching
// tasks from Kapacitor. This constant was chosen after some benchmarking
// work and should likely work well for quad-core systems
ListTaskWorkers = 4
)
const maxLimit = 1_000_000
// ensure PaginatingKapaClient is a KapaClient
var _ KapaClient = &PaginatingKapaClient{}
// PaginatingKapaClient is a Kapacitor client that automatically navigates
// through Kapacitor's pagination to fetch all results
type PaginatingKapaClient struct {
KapaClient
FetchRate int // specifies the number of elements to fetch from Kapacitor at a time
}
// ListTasks lists all available tasks from Kapacitor, navigating pagination as
// it fetches them
func (p *PaginatingKapaClient) ListTasks(opts *client.ListTasksOptions) ([]client.Task, error) {
if opts.Pattern != "" && opts.Offset > 0 {
return nil, errors.New("offset paramater must be 0 when pattern parameter is supplied")
}
allTasks := make([]client.Task, 0, p.FetchRate)
optChan := make(chan client.ListTasksOptions)
taskChan := make(chan []client.Task, ListTaskWorkers)
done := make(chan struct{})
var once sync.Once
readFinished := func() {
once.Do(func() {
close(done)
})
}
go p.generateKapacitorOptions(optChan, *opts, done)
var wg sync.WaitGroup
wg.Add(ListTaskWorkers)
for i := 0; i < ListTaskWorkers; i++ {
go func() {
p.fetchFromKapacitor(optChan, taskChan, readFinished)
wg.Done()
}()
}
var gatherWg sync.WaitGroup
gatherWg.Add(1)
go func() {
// pos and limit are position within filtered results
// that are used with pattern searching
pos := -1
limit := opts.Limit
if limit <= 0 {
limit = maxLimit
}
processTasksBatches:
for tasks := range taskChan {
if opts.Pattern != "" {
for _, task := range tasks {
if strings.Contains(GetAlertRuleName(&task), opts.Pattern) {
pos++
if pos < opts.Offset {
continue
}
allTasks = append(allTasks, task)
if len(allTasks) == limit {
readFinished()
break processTasksBatches
}
}
}
continue
}
allTasks = append(allTasks, tasks...)
}
gatherWg.Done()
}()
wg.Wait()
close(taskChan)
gatherWg.Wait()
return allTasks, nil
}
// fetchFromKapacitor fetches a set of results from a kapacitor by reading a
// set of options from the provided optChan. Fetched tasks are pushed onto the
// provided taskChan
func (p *PaginatingKapaClient) fetchFromKapacitor(optChan chan client.ListTasksOptions, taskChan chan []client.Task, readFinished func()) {
for opt := range optChan {
resp, err := p.KapaClient.ListTasks(&opt)
if err != nil {
return
}
// break and stop all workers if we're done
if len(resp) == 0 {
readFinished()
return
}
// handoff tasks to consumer
taskChan <- resp
}
}
// generateKapacitorOptions creates ListTasksOptions with incrementally greater
// Limit and Offset parameters, and inserts them into the provided optChan
func (p *PaginatingKapaClient) generateKapacitorOptions(optChan chan client.ListTasksOptions, opts client.ListTasksOptions, done chan struct{}) {
toFetchCount := opts.Limit
if toFetchCount <= 0 {
toFetchCount = maxLimit
}
// fetch all data when pattern is set, chronograf is herein filters by task name
// whereas kapacitor filters by task ID that is hidden to chronograf users
if opts.Pattern != "" {
toFetchCount = maxLimit
opts.Pattern = ""
opts.Offset = 0
}
nextLimit := func() int {
if p.FetchRate < toFetchCount {
toFetchCount -= p.FetchRate
return p.FetchRate
}
retVal := toFetchCount
toFetchCount = 0
return retVal
}
opts.Limit = nextLimit()
opts.Pattern = ""
generateOpts:
for {
select {
case <-done:
break generateOpts
case optChan <- opts:
// nop
}
if toFetchCount <= 0 {
// no more data to read from options
break generateOpts
}
opts.Offset = opts.Offset + p.FetchRate
opts.Limit = nextLimit()
}
close(optChan)
}
var reTaskName = regexp.MustCompile(`[\r\n]*var[ \t]+name[ \t]+=[ \t]+'([^\n]+)'[ \r\t]*\n`)
// GetAlertRuleName returns chronograf rule name out of kapacitor task.
// Chronograf UI always preffer alert rule names.
func GetAlertRuleName(task *client.Task) string {
// #5403 override name when defined in a variable
if nameVar, exists := task.Vars["name"]; exists {
if val, isString := nameVar.Value.(string); isString && val != "" {
return val
}
}
// try to parse Name from a line such as: `var name = 'Rule Name'
if matches := reTaskName.FindStringSubmatch(task.TICKscript); matches != nil {
return strings.ReplaceAll(strings.ReplaceAll(matches[1], "\\'", "'"), "\\\\", "\\")
}
return task.ID
}