Fix null queryConfig and add substantial test coverage to kapacitor
storagepull/10616/head
parent
8772cecef7
commit
243286892f
|
@ -1333,6 +1333,103 @@ trigger
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test threshold lambda",
|
||||
script: `var db = '_internal'
|
||||
|
||||
var rp = 'monitor'
|
||||
|
||||
var measurement = 'cq'
|
||||
|
||||
var groupBy = []
|
||||
|
||||
var whereFilter = lambda: TRUE
|
||||
|
||||
var name = 'rule 1'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = ''
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90000
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|eval(lambda: "queryOk")
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')
|
||||
`,
|
||||
want: chronograf.AlertRule{
|
||||
Name: "rule 1",
|
||||
Trigger: "threshold",
|
||||
Alerts: []string{},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Operator: "greater than",
|
||||
Value: "90000",
|
||||
},
|
||||
Every: "",
|
||||
Message: "",
|
||||
Details: "",
|
||||
Query: &chronograf.QueryConfig{
|
||||
Database: "_internal",
|
||||
RetentionPolicy: "monitor",
|
||||
Measurement: "cq",
|
||||
Fields: []chronograf.Field{
|
||||
{
|
||||
Field: "queryOk",
|
||||
Funcs: []string{},
|
||||
},
|
||||
},
|
||||
GroupBy: chronograf.GroupBy{
|
||||
Tags: []string{},
|
||||
},
|
||||
AreTagsAccepted: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
@ -1343,6 +1440,13 @@ trigger
|
|||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Reverse() = \n%#v\n, want \n%#v\n", got, tt.want)
|
||||
if tt.want.Query != nil {
|
||||
if got.Query == nil {
|
||||
t.Errorf("Reverse() = got nil QueryConfig")
|
||||
} else if !reflect.DeepEqual(*got.Query, *tt.want.Query) {
|
||||
t.Errorf("Reverse() = QueryConfig not equal\n%#v\n, want \n%#v\n", *got.Query, *tt.want.Query)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -5,23 +5,46 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/uuid"
|
||||
client "github.com/influxdata/kapacitor/client/v1"
|
||||
)
|
||||
|
||||
// Client communicates to kapacitor
|
||||
type Client struct {
|
||||
URL string
|
||||
Username string
|
||||
Password string
|
||||
ID chronograf.ID
|
||||
Ticker chronograf.Ticker
|
||||
}
|
||||
|
||||
const (
|
||||
// Prefix is prepended to the ID of all alerts
|
||||
Prefix = "chronograf-v1-"
|
||||
)
|
||||
|
||||
// Client communicates to kapacitor
|
||||
type Client struct {
|
||||
URL string
|
||||
Username string
|
||||
Password string
|
||||
ID chronograf.ID
|
||||
Ticker chronograf.Ticker
|
||||
kapaClient func(url, username, password string) (KapaClient, error)
|
||||
}
|
||||
|
||||
// KapaClient represents a connection to a kapacitor instance
|
||||
type KapaClient interface {
|
||||
CreateTask(opt client.CreateTaskOptions) (client.Task, error)
|
||||
Task(link client.Link, opt *client.TaskOptions) (client.Task, error)
|
||||
ListTasks(opt *client.ListTasksOptions) ([]client.Task, error)
|
||||
UpdateTask(link client.Link, opt client.UpdateTaskOptions) (client.Task, error)
|
||||
DeleteTask(link client.Link) error
|
||||
}
|
||||
|
||||
// NewClient creates a client that interfaces with Kapacitor tasks
|
||||
func NewClient(url, username, password string) *Client {
|
||||
return &Client{
|
||||
URL: url,
|
||||
Username: username,
|
||||
Password: password,
|
||||
ID: &uuid.V4{},
|
||||
Ticker: &Alert{},
|
||||
kapaClient: NewKapaClient,
|
||||
}
|
||||
}
|
||||
|
||||
// Task represents a running kapacitor task
|
||||
type Task struct {
|
||||
ID string // Kapacitor ID
|
||||
|
@ -43,7 +66,7 @@ func (c *Client) HrefOutput(ID string) string {
|
|||
|
||||
// Create builds and POSTs a tickscript to kapacitor
|
||||
func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -82,7 +105,7 @@ func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task,
|
|||
|
||||
// Delete removes tickscript task from kapacitor
|
||||
func (c *Client) Delete(ctx context.Context, href string) error {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -90,7 +113,7 @@ func (c *Client) Delete(ctx context.Context, href string) error {
|
|||
}
|
||||
|
||||
func (c *Client) updateStatus(ctx context.Context, href string, status client.TaskStatus) (*Task, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -124,7 +147,7 @@ func (c *Client) Enable(ctx context.Context, href string) (*Task, error) {
|
|||
|
||||
// AllStatus returns the status of all tasks in kapacitor
|
||||
func (c *Client) AllStatus(ctx context.Context) (map[string]string, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -148,7 +171,7 @@ func (c *Client) AllStatus(ctx context.Context) (map[string]string, error) {
|
|||
|
||||
// Status returns the status of a task in kapacitor
|
||||
func (c *Client) Status(ctx context.Context, href string) (string, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -162,7 +185,7 @@ func (c *Client) Status(ctx context.Context, href string) (string, error) {
|
|||
|
||||
// All returns all tasks in kapacitor
|
||||
func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -185,7 +208,6 @@ func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, erro
|
|||
}
|
||||
} else {
|
||||
rule.ID = task.ID
|
||||
rule.Query = nil
|
||||
rule.TICKScript = script
|
||||
alerts[task.ID] = rule
|
||||
}
|
||||
|
@ -195,7 +217,7 @@ func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, erro
|
|||
|
||||
// Get returns a single alert in kapacitor
|
||||
func (c *Client) Get(ctx context.Context, id string) (chronograf.AlertRule, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return chronograf.AlertRule{}, err
|
||||
}
|
||||
|
@ -222,7 +244,7 @@ func (c *Client) Get(ctx context.Context, id string) (chronograf.AlertRule, erro
|
|||
|
||||
// Update changes the tickscript of a given id.
|
||||
func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertRule) (*Task, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -264,25 +286,26 @@ func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertR
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) kapaClient(ctx context.Context) (*client.Client, error) {
|
||||
var creds *client.Credentials
|
||||
if c.Username != "" {
|
||||
creds = &client.Credentials{
|
||||
Method: client.UserAuthentication,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
}
|
||||
}
|
||||
|
||||
return client.New(client.Config{
|
||||
URL: c.URL,
|
||||
Credentials: creds,
|
||||
})
|
||||
}
|
||||
|
||||
func toTask(q *chronograf.QueryConfig) client.TaskType {
|
||||
if q == nil || q.RawText == nil || *q.RawText == "" {
|
||||
return client.StreamTask
|
||||
}
|
||||
return client.BatchTask
|
||||
}
|
||||
|
||||
// NewKapaClient creates a Kapacitor client connection
|
||||
func NewKapaClient(url, username, password string) (KapaClient, error) {
|
||||
var creds *client.Credentials
|
||||
if username != "" {
|
||||
creds = &client.Credentials{
|
||||
Method: client.UserAuthentication,
|
||||
Username: username,
|
||||
Password: password,
|
||||
}
|
||||
}
|
||||
|
||||
return client.New(client.Config{
|
||||
URL: url,
|
||||
Credentials: creds,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,745 @@
|
|||
package kapacitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
client "github.com/influxdata/kapacitor/client/v1"
|
||||
)
|
||||
|
||||
type MockKapa struct {
|
||||
ResTask client.Task
|
||||
ResTasks []client.Task
|
||||
Error error
|
||||
|
||||
client.CreateTaskOptions
|
||||
client.Link
|
||||
*client.TaskOptions
|
||||
*client.ListTasksOptions
|
||||
client.UpdateTaskOptions
|
||||
}
|
||||
|
||||
func (m *MockKapa) CreateTask(opt client.CreateTaskOptions) (client.Task, error) {
|
||||
m.CreateTaskOptions = opt
|
||||
return m.ResTask, m.Error
|
||||
}
|
||||
|
||||
func (m *MockKapa) Task(link client.Link, opt *client.TaskOptions) (client.Task, error) {
|
||||
m.Link = link
|
||||
m.TaskOptions = opt
|
||||
return m.ResTask, m.Error
|
||||
}
|
||||
|
||||
func (m *MockKapa) ListTasks(opt *client.ListTasksOptions) ([]client.Task, error) {
|
||||
m.ListTasksOptions = opt
|
||||
return m.ResTasks, m.Error
|
||||
}
|
||||
|
||||
func (m *MockKapa) UpdateTask(link client.Link, opt client.UpdateTaskOptions) (client.Task, error) {
|
||||
m.Link = link
|
||||
m.UpdateTaskOptions = opt
|
||||
return m.ResTask, m.Error
|
||||
}
|
||||
|
||||
func (m *MockKapa) DeleteTask(link client.Link) error {
|
||||
m.Link = link
|
||||
return m.Error
|
||||
}
|
||||
|
||||
func TestClient_AllStatus(t *testing.T) {
|
||||
type fields struct {
|
||||
URL string
|
||||
Username string
|
||||
Password string
|
||||
ID chronograf.ID
|
||||
Ticker chronograf.Ticker
|
||||
kapaClient func(url, username, password string) (KapaClient, error)
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
}
|
||||
kapa := &MockKapa{}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want map[string]string
|
||||
wantErr bool
|
||||
resTask client.Task
|
||||
resTasks []client.Task
|
||||
resError error
|
||||
|
||||
createTaskOptions client.CreateTaskOptions
|
||||
link client.Link
|
||||
taskOptions *client.TaskOptions
|
||||
listTasksOptions *client.ListTasksOptions
|
||||
updateTaskOptions client.UpdateTaskOptions
|
||||
}{
|
||||
{
|
||||
name: "return no tasks",
|
||||
fields: fields{
|
||||
URL: "http://hill-valley-preservation-society.org",
|
||||
Username: "ElsaRaven",
|
||||
Password: "save the clock tower",
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{
|
||||
Fields: []string{"status"},
|
||||
},
|
||||
want: map[string]string{},
|
||||
},
|
||||
{
|
||||
name: "return two tasks",
|
||||
fields: fields{
|
||||
URL: "http://hill-valley-preservation-society.org",
|
||||
Username: "ElsaRaven",
|
||||
Password: "save the clock tower",
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{
|
||||
Fields: []string{"status"},
|
||||
},
|
||||
resTasks: []client.Task{
|
||||
client.Task{
|
||||
ID: "howdy",
|
||||
Status: client.Enabled,
|
||||
},
|
||||
client.Task{
|
||||
ID: "doody",
|
||||
Status: client.Disabled,
|
||||
},
|
||||
},
|
||||
want: map[string]string{
|
||||
"howdy": "enabled",
|
||||
"doody": "disabled",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "list task error",
|
||||
fields: fields{
|
||||
URL: "http://hill-valley-preservation-society.org",
|
||||
Username: "ElsaRaven",
|
||||
Password: "save the clock tower",
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{
|
||||
Fields: []string{"status"},
|
||||
},
|
||||
resError: fmt.Errorf("this is an error"),
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
kapa.ResTask = tt.resTask
|
||||
kapa.ResTasks = tt.resTasks
|
||||
kapa.Error = tt.resError
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
URL: tt.fields.URL,
|
||||
Username: tt.fields.Username,
|
||||
Password: tt.fields.Password,
|
||||
ID: tt.fields.ID,
|
||||
Ticker: tt.fields.Ticker,
|
||||
kapaClient: tt.fields.kapaClient,
|
||||
}
|
||||
got, err := c.AllStatus(tt.args.ctx)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Client.AllStatus() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Client.AllStatus() = %v, want %v", got, tt.want)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.CreateTaskOptions, tt.createTaskOptions) {
|
||||
t.Errorf("Client.AllStatus() = createTaskOptions %v, want %v", kapa.CreateTaskOptions, tt.createTaskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.AllStatus() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.TaskOptions, tt.taskOptions) {
|
||||
t.Errorf("Client.AllStatus() = taskOptions %v, want %v", kapa.TaskOptions, tt.taskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.AllStatus() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.Link, tt.link) {
|
||||
t.Errorf("Client.AllStatus() = Link %v, want %v", kapa.Link, tt.link)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_All(t *testing.T) {
|
||||
type fields struct {
|
||||
URL string
|
||||
Username string
|
||||
Password string
|
||||
ID chronograf.ID
|
||||
Ticker chronograf.Ticker
|
||||
kapaClient func(url, username, password string) (KapaClient, error)
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
}
|
||||
kapa := &MockKapa{}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want map[string]chronograf.AlertRule
|
||||
wantErr bool
|
||||
resTask client.Task
|
||||
resTasks []client.Task
|
||||
resError error
|
||||
|
||||
createTaskOptions client.CreateTaskOptions
|
||||
link client.Link
|
||||
taskOptions *client.TaskOptions
|
||||
listTasksOptions *client.ListTasksOptions
|
||||
updateTaskOptions client.UpdateTaskOptions
|
||||
}{
|
||||
{
|
||||
name: "return no tasks",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{},
|
||||
want: map[string]chronograf.AlertRule{},
|
||||
},
|
||||
{
|
||||
name: "return a non-reversible task",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{},
|
||||
resTasks: []client.Task{
|
||||
client.Task{
|
||||
ID: "howdy",
|
||||
Status: client.Enabled,
|
||||
},
|
||||
},
|
||||
want: map[string]chronograf.AlertRule{
|
||||
"howdy": chronograf.AlertRule{
|
||||
ID: "howdy",
|
||||
Name: "howdy",
|
||||
TICKScript: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "return a reversible task",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
listTasksOptions: &client.ListTasksOptions{},
|
||||
resTasks: []client.Task{
|
||||
client.Task{
|
||||
ID: "rule 1",
|
||||
Status: client.Enabled,
|
||||
TICKscript: `var db = '_internal'
|
||||
|
||||
var rp = 'monitor'
|
||||
|
||||
var measurement = 'cq'
|
||||
|
||||
var groupBy = []
|
||||
|
||||
var whereFilter = lambda: TRUE
|
||||
|
||||
var name = 'rule 1'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = ''
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90000
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|eval(lambda: "queryOk")
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')
|
||||
`,
|
||||
},
|
||||
},
|
||||
want: map[string]chronograf.AlertRule{
|
||||
"rule 1": chronograf.AlertRule{
|
||||
ID: "rule 1",
|
||||
Name: "rule 1",
|
||||
TICKScript: `var db = '_internal'
|
||||
|
||||
var rp = 'monitor'
|
||||
|
||||
var measurement = 'cq'
|
||||
|
||||
var groupBy = []
|
||||
|
||||
var whereFilter = lambda: TRUE
|
||||
|
||||
var name = 'rule 1'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = ''
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90000
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|eval(lambda: "queryOk")
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')
|
||||
`,
|
||||
Trigger: "threshold",
|
||||
Alerts: []string{},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Operator: "greater than",
|
||||
Value: "90000",
|
||||
},
|
||||
Query: &chronograf.QueryConfig{
|
||||
Database: "_internal",
|
||||
RetentionPolicy: "monitor",
|
||||
Measurement: "cq",
|
||||
Fields: []chronograf.Field{
|
||||
{
|
||||
Field: "queryOk",
|
||||
Funcs: []string{},
|
||||
},
|
||||
},
|
||||
GroupBy: chronograf.GroupBy{
|
||||
Tags: []string{},
|
||||
},
|
||||
AreTagsAccepted: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
kapa.ResTask = tt.resTask
|
||||
kapa.ResTasks = tt.resTasks
|
||||
kapa.Error = tt.resError
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
URL: tt.fields.URL,
|
||||
Username: tt.fields.Username,
|
||||
Password: tt.fields.Password,
|
||||
ID: tt.fields.ID,
|
||||
Ticker: tt.fields.Ticker,
|
||||
kapaClient: tt.fields.kapaClient,
|
||||
}
|
||||
got, err := c.All(tt.args.ctx)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Client.All() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Client.All() = %#v, want %#v", got, tt.want)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.CreateTaskOptions, tt.createTaskOptions) {
|
||||
t.Errorf("Client.All() = createTaskOptions %v, want %v", kapa.CreateTaskOptions, tt.createTaskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.All() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.TaskOptions, tt.taskOptions) {
|
||||
t.Errorf("Client.All() = taskOptions %v, want %v", kapa.TaskOptions, tt.taskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.All() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.Link, tt.link) {
|
||||
t.Errorf("Client.All() = Link %v, want %v", kapa.Link, tt.link)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_Get(t *testing.T) {
|
||||
type fields struct {
|
||||
URL string
|
||||
Username string
|
||||
Password string
|
||||
ID chronograf.ID
|
||||
Ticker chronograf.Ticker
|
||||
kapaClient func(url, username, password string) (KapaClient, error)
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
id string
|
||||
}
|
||||
kapa := &MockKapa{}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want chronograf.AlertRule
|
||||
wantErr bool
|
||||
resTask client.Task
|
||||
resTasks []client.Task
|
||||
resError error
|
||||
|
||||
createTaskOptions client.CreateTaskOptions
|
||||
link client.Link
|
||||
taskOptions *client.TaskOptions
|
||||
listTasksOptions *client.ListTasksOptions
|
||||
updateTaskOptions client.UpdateTaskOptions
|
||||
}{
|
||||
{
|
||||
name: "return no task",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "myid",
|
||||
},
|
||||
taskOptions: nil,
|
||||
wantErr: true,
|
||||
resError: fmt.Errorf("No such task"),
|
||||
link: client.Link{
|
||||
Href: "/kapacitor/v1/tasks/myid",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "return non-reversible task",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "myid",
|
||||
},
|
||||
taskOptions: nil,
|
||||
resTask: client.Task{
|
||||
ID: "myid",
|
||||
},
|
||||
want: chronograf.AlertRule{
|
||||
ID: "myid",
|
||||
Name: "myid",
|
||||
},
|
||||
link: client.Link{
|
||||
Href: "/kapacitor/v1/tasks/myid",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "return reversible task",
|
||||
fields: fields{
|
||||
kapaClient: func(url, username, password string) (KapaClient, error) {
|
||||
return kapa, nil
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
id: "rule 1",
|
||||
},
|
||||
taskOptions: nil,
|
||||
resTask: client.Task{
|
||||
ID: "rule 1",
|
||||
TICKscript: `var db = '_internal'
|
||||
|
||||
var rp = 'monitor'
|
||||
|
||||
var measurement = 'cq'
|
||||
|
||||
var groupBy = []
|
||||
|
||||
var whereFilter = lambda: TRUE
|
||||
|
||||
var name = 'rule 1'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = ''
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90000
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|eval(lambda: "queryOk")
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')
|
||||
`,
|
||||
},
|
||||
want: chronograf.AlertRule{
|
||||
ID: "rule 1",
|
||||
Name: "rule 1",
|
||||
TICKScript: `var db = '_internal'
|
||||
|
||||
var rp = 'monitor'
|
||||
|
||||
var measurement = 'cq'
|
||||
|
||||
var groupBy = []
|
||||
|
||||
var whereFilter = lambda: TRUE
|
||||
|
||||
var name = 'rule 1'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = ''
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90000
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|eval(lambda: "queryOk")
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')
|
||||
`,
|
||||
Trigger: "threshold",
|
||||
Alerts: []string{},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Operator: "greater than",
|
||||
Value: "90000",
|
||||
},
|
||||
Query: &chronograf.QueryConfig{
|
||||
Database: "_internal",
|
||||
RetentionPolicy: "monitor",
|
||||
Measurement: "cq",
|
||||
Fields: []chronograf.Field{
|
||||
{
|
||||
Field: "queryOk",
|
||||
Funcs: []string{},
|
||||
},
|
||||
},
|
||||
GroupBy: chronograf.GroupBy{
|
||||
Tags: []string{},
|
||||
},
|
||||
AreTagsAccepted: false,
|
||||
},
|
||||
},
|
||||
link: client.Link{
|
||||
Href: "/kapacitor/v1/tasks/rule 1",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
kapa.ResTask = tt.resTask
|
||||
kapa.ResTasks = tt.resTasks
|
||||
kapa.Error = tt.resError
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
URL: tt.fields.URL,
|
||||
Username: tt.fields.Username,
|
||||
Password: tt.fields.Password,
|
||||
ID: tt.fields.ID,
|
||||
Ticker: tt.fields.Ticker,
|
||||
kapaClient: tt.fields.kapaClient,
|
||||
}
|
||||
got, err := c.Get(tt.args.ctx, tt.args.id)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Client.Get() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Client.Get() =\n%#v\nwant\n%#v", got, tt.want)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.CreateTaskOptions, tt.createTaskOptions) {
|
||||
t.Errorf("Client.Get() = createTaskOptions %v, want %v", kapa.CreateTaskOptions, tt.createTaskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.Get() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.TaskOptions, tt.taskOptions) {
|
||||
t.Errorf("Client.Get() = taskOptions %v, want %v", kapa.TaskOptions, tt.taskOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.ListTasksOptions, tt.listTasksOptions) {
|
||||
t.Errorf("Client.Get() = listTasksOptions %v, want %v", kapa.ListTasksOptions, tt.listTasksOptions)
|
||||
}
|
||||
if !reflect.DeepEqual(kapa.Link, tt.link) {
|
||||
t.Errorf("Client.Get() = Link %v, want %v", kapa.Link, tt.link)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/bouk/httprouter"
|
||||
"github.com/influxdata/chronograf"
|
||||
kapa "github.com/influxdata/chronograf/kapacitor"
|
||||
"github.com/influxdata/chronograf/uuid"
|
||||
)
|
||||
|
||||
type postKapacitorRequest struct {
|
||||
|
@ -300,13 +299,7 @@ func (h *Service) KapacitorRulesPost(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
Ticker: &kapa.Alert{},
|
||||
ID: &uuid.V4{},
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
|
||||
var req chronograf.AlertRule
|
||||
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
|
@ -427,12 +420,7 @@ func (h *Service) KapacitorRulesPut(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
tid := httprouter.GetParamFromContext(ctx, "tid")
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
Ticker: &kapa.Alert{},
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
var req chronograf.AlertRule
|
||||
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
invalidJSON(w, h.Logger)
|
||||
|
@ -503,12 +491,8 @@ func (h *Service) KapacitorRulesStatus(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
tid := httprouter.GetParamFromContext(ctx, "tid")
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
Ticker: &kapa.Alert{},
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
|
||||
var req KapacitorStatus
|
||||
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
invalidJSON(w, h.Logger)
|
||||
|
@ -567,13 +551,7 @@ func (h *Service) KapacitorRulesGet(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
ticker := &kapa.Alert{}
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
Ticker: ticker,
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
rules, err := c.All(ctx)
|
||||
if err != nil {
|
||||
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
|
||||
|
@ -627,13 +605,7 @@ func (h *Service) KapacitorRulesID(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
tid := httprouter.GetParamFromContext(ctx, "tid")
|
||||
|
||||
ticker := &kapa.Alert{}
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
Ticker: ticker,
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
|
||||
// Check if the rule exists within scope
|
||||
rule, err := c.Get(ctx, tid)
|
||||
|
@ -676,11 +648,7 @@ func (h *Service) KapacitorRulesDelete(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
c := kapa.Client{
|
||||
URL: srv.URL,
|
||||
Username: srv.Username,
|
||||
Password: srv.Password,
|
||||
}
|
||||
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
|
||||
|
||||
tid := httprouter.GetParamFromContext(ctx, "tid")
|
||||
// Check if the rule is linked to this server and kapacitor
|
||||
|
|
Loading…
Reference in New Issue