Update kapacitor generator to match with frontend UI changes.
parent
b99c721fa3
commit
60baf46fb7
|
@ -87,41 +87,16 @@ type SourcesStore interface {
|
|||
Update(context.Context, Source) error
|
||||
}
|
||||
|
||||
// QueryConfig represents UI query from the data explorer
|
||||
type QueryConfig struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Database string `json:"database"`
|
||||
Measurement string `json:"measurement"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Fields []struct {
|
||||
Field string `json:"field"`
|
||||
Funcs []string `json:"funcs"`
|
||||
} `json:"fields"`
|
||||
Tags map[string][]string `json:"tags"`
|
||||
GroupBy struct {
|
||||
Time string `json:"time"`
|
||||
Tags []string `json:"tags"`
|
||||
} `json:"groupBy"`
|
||||
AreTagsAccepted bool `json:"areTagsAccepted"`
|
||||
RawText string `json:"rawText,omitempty"`
|
||||
}
|
||||
|
||||
// AlertRule represents rules for building a tickscript alerting task
|
||||
type AlertRule struct {
|
||||
ID string `json:"id,omitempty"` // ID is the unique ID of the alert
|
||||
Name string `json:"name"` // Name is the user-defined name for the alert
|
||||
Version string `json:"version"` // Version of the alert
|
||||
Query QueryConfig `json:"query"` // Query is the filter of data for the alert.
|
||||
Trigger string `json:"trigger"` // Trigger is a type that defines when to trigger the alert
|
||||
AlertServices []string `json:"alerts"` // AlertServices name all the services to notify (e.g. pagerduty)
|
||||
Type string `json:"type"` // Type specifies kind of AlertRule (stream, batch)
|
||||
Operator string `json:"operator"` // Operator for alert comparison
|
||||
Aggregate string `json:"aggregate"` // Statistic aggregate over window of data
|
||||
Period string `json:"period"` // Period is the window to search for alerting criteria
|
||||
Every string `json:"every"` // Every how often to check for the alerting criteria
|
||||
Critical string `json:"critical"` // Critical is the boundary value when alert goes critical
|
||||
Shift string `json:"shift"` // Shift is the amount of time to look into the past for the alert to compare to the present
|
||||
Message string `json:"message"` // Message included with alert
|
||||
ID string `json:"id,omitempty"` // ID is the unique ID of the alert
|
||||
Query QueryConfig `json:"query"` // Query is the filter of data for the alert.
|
||||
Every string `json:"every"` // Every how often to check for the alerting criteria
|
||||
Alerts []string `json:"alerts"` // AlertServices name all the services to notify (e.g. pagerduty)
|
||||
Message string `json:"message"` // Message included with alert
|
||||
Trigger string `json:"trigger"` // Trigger is a type that defines when to trigger the alert
|
||||
TriggerValues TriggerValues `json:"values"` // Defines the values that cause the alert to trigger
|
||||
Name string `json:"name"` // Name is the user-defined name for the alert
|
||||
}
|
||||
|
||||
// AlertRulesStore stores rules for building tickscript alerting tasks
|
||||
|
@ -147,6 +122,55 @@ type Ticker interface {
|
|||
Generate(AlertRule) (TICKScript, error)
|
||||
}
|
||||
|
||||
// DeadmanValue specifies the timeout duration of a deadman alert.
|
||||
type DeadmanValue struct {
|
||||
Period string `json:"period, omitempty"` // Period is the max time data can be missed before an alert
|
||||
}
|
||||
|
||||
// RelativeValue specifies the trigger logic for a relative value change alert.
|
||||
type RelativeValue struct {
|
||||
Change string `json:"change,omitempty"` // Change specifies if the change is a percent or absolute
|
||||
Period string `json:"period,omitempty"` // Period is the window to search for alerting criteria
|
||||
Shift string `json:"shift,omitempty"` // Shift is the amount of time to look into the past for the alert to compare to the present
|
||||
Operator string `json:"operator,omitempty"` // Operator for alert comparison
|
||||
Value string `json:"value,omitempty"` // Value is the boundary value when alert goes critical
|
||||
}
|
||||
|
||||
// ThresholdValue specifies the trigger logic for a threshold change alert.
|
||||
type ThresholdValue struct {
|
||||
Period string `json:"period,omitempty"` // Period is the window to search for the alerting criteria
|
||||
Operator string `json:"operator,omitempty"` // Operator for alert comparison
|
||||
Percentile string `json:"percentile,omitempty"` // Percentile is defined only when Relation is not "Once"
|
||||
Relation string `json:"relation,omitempty"` // Relation defines the logic about how often the threshold is met to be an alert.
|
||||
Value string `json:"value,omitempty"` // Value is the boundary value when alert goes critical
|
||||
}
|
||||
|
||||
// TriggerValues specifies which of the trigger types defines the alerting logic. One of these whould not be nil.
|
||||
type TriggerValues struct {
|
||||
Deadman *DeadmanValue `json:"deadman,omitempty"`
|
||||
Relative *RelativeValue `json:"relative,omitempty"`
|
||||
Threshold *ThresholdValue `json:"threshold,omitempty"`
|
||||
}
|
||||
|
||||
// QueryConfig represents UI query from the data explorer
|
||||
type QueryConfig struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Database string `json:"database"`
|
||||
Measurement string `json:"measurement"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Fields []struct {
|
||||
Field string `json:"field"`
|
||||
Funcs []string `json:"funcs"`
|
||||
} `json:"fields"`
|
||||
Tags map[string][]string `json:"tags"`
|
||||
GroupBy struct {
|
||||
Time string `json:"time"`
|
||||
Tags []string `json:"tags"`
|
||||
} `json:"groupBy"`
|
||||
AreTagsAccepted bool `json:"areTagsAccepted"`
|
||||
RawText string `json:"rawText,omitempty"`
|
||||
}
|
||||
|
||||
// Server represents a proxy connection to an HTTP server
|
||||
type Server struct {
|
||||
ID int // ID is the unique ID of the server
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
// AlertServices generates alert chaining methods to be attached to an alert from all rule Services
|
||||
func AlertServices(rule chronograf.AlertRule) (string, error) {
|
||||
alert := ""
|
||||
for _, service := range rule.AlertServices {
|
||||
for _, service := range rule.Alerts {
|
||||
if err := ValidateAlert(service); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ func TestAlertServices(t *testing.T) {
|
|||
{
|
||||
name: "Test several valid services",
|
||||
rule: chronograf.AlertRule{
|
||||
AlertServices: []string{"slack", "victorOps", "email"},
|
||||
Alerts: []string{"slack", "victorOps", "email"},
|
||||
},
|
||||
want: `alert()
|
||||
.slack()
|
||||
|
@ -27,7 +27,7 @@ func TestAlertServices(t *testing.T) {
|
|||
{
|
||||
name: "Test single invalid services amongst several valid",
|
||||
rule: chronograf.AlertRule{
|
||||
AlertServices: []string{"slack", "invalid", "email"},
|
||||
Alerts: []string{"slack", "invalid", "email"},
|
||||
},
|
||||
want: ``,
|
||||
wantErr: true,
|
||||
|
@ -35,7 +35,7 @@ func TestAlertServices(t *testing.T) {
|
|||
{
|
||||
name: "Test single invalid service",
|
||||
rule: chronograf.AlertRule{
|
||||
AlertServices: []string{"invalid"},
|
||||
Alerts: []string{"invalid"},
|
||||
},
|
||||
want: ``,
|
||||
wantErr: true,
|
||||
|
@ -43,7 +43,7 @@ func TestAlertServices(t *testing.T) {
|
|||
{
|
||||
name: "Test single valid service",
|
||||
rule: chronograf.AlertRule{
|
||||
AlertServices: []string{"slack"},
|
||||
Alerts: []string{"slack"},
|
||||
},
|
||||
want: `alert()
|
||||
.slack()
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
client "github.com/influxdata/kapacitor/client/v1"
|
||||
)
|
||||
|
||||
// Client communicates to kapacitor
|
||||
type Client struct {
|
||||
URL string
|
||||
Username string
|
||||
|
@ -17,8 +18,8 @@ type Client struct {
|
|||
}
|
||||
|
||||
const (
|
||||
Prefix = "chronograf-v1-"
|
||||
Pattern = "chronograf-v1-*"
|
||||
// Prefix is prepended to the ID of all alerts
|
||||
Prefix = "chronograf-v1-"
|
||||
)
|
||||
|
||||
// Task represents a running kapacitor task
|
||||
|
@ -28,10 +29,12 @@ type Task struct {
|
|||
TICKScript chronograf.TICKScript // TICKScript is the running script
|
||||
}
|
||||
|
||||
// Href returns the link to a kapacitor task given an id
|
||||
func (c *Client) Href(ID string) string {
|
||||
return fmt.Sprintf("/kapacitor/v1/tasks/%s", ID)
|
||||
}
|
||||
|
||||
// Create builds and POSTs a tickscript to kapacitor
|
||||
func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
if err != nil {
|
||||
|
@ -48,15 +51,10 @@ func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
taskType, err := toTask(rule.Type)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kapaID := Prefix + id
|
||||
task, err := kapa.CreateTask(client.CreateTaskOptions{
|
||||
ID: kapaID,
|
||||
Type: taskType,
|
||||
Type: toTask(rule.Query),
|
||||
DBRPs: []client.DBRP{{Database: rule.Query.Database, RetentionPolicy: rule.Query.RetentionPolicy}},
|
||||
TICKscript: string(script),
|
||||
Status: client.Enabled,
|
||||
|
@ -72,6 +70,7 @@ func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task,
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Delete removes tickscript task from kapacitor
|
||||
func (c *Client) Delete(ctx context.Context, href string) error {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
if err != nil {
|
||||
|
@ -80,6 +79,7 @@ func (c *Client) Delete(ctx context.Context, href string) error {
|
|||
return kapa.DeleteTask(client.Link{Href: href})
|
||||
}
|
||||
|
||||
// Update changes the tickscript of a given id.
|
||||
func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertRule) (*Task, error) {
|
||||
kapa, err := c.kapaClient(ctx)
|
||||
if err != nil {
|
||||
|
@ -91,15 +91,10 @@ func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertR
|
|||
return nil, err
|
||||
}
|
||||
|
||||
taskType, err := toTask(rule.Type)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts := client.UpdateTaskOptions{
|
||||
TICKscript: string(script),
|
||||
Status: client.Enabled,
|
||||
Type: taskType,
|
||||
Type: toTask(rule.Query),
|
||||
DBRPs: []client.DBRP{
|
||||
{
|
||||
Database: rule.Query.Database,
|
||||
|
@ -135,12 +130,10 @@ func (c *Client) kapaClient(ctx context.Context) (*client.Client, error) {
|
|||
Credentials: creds,
|
||||
})
|
||||
}
|
||||
func toTask(taskType string) (client.TaskType, error) {
|
||||
if taskType == "stream" {
|
||||
return client.StreamTask, nil
|
||||
} else if taskType == "batch" {
|
||||
return client.BatchTask, nil
|
||||
} else {
|
||||
return 0, fmt.Errorf("Unknown alert type %s", taskType)
|
||||
|
||||
func toTask(q chronograf.QueryConfig) client.TaskType {
|
||||
if q.RawText == "" {
|
||||
return client.StreamTask
|
||||
}
|
||||
return client.BatchTask
|
||||
}
|
||||
|
|
|
@ -33,17 +33,21 @@ func Data(rule chronograf.AlertRule) (string, error) {
|
|||
stream = fmt.Sprintf("%s\n.groupBy(groupby)\n", stream)
|
||||
stream = stream + ".where(where_filter)\n"
|
||||
// Only need aggregate functions for threshold and relative
|
||||
|
||||
value := ""
|
||||
if rule.Trigger != "deadman" {
|
||||
for _, field := range rule.Query.Fields {
|
||||
for _, fnc := range field.Funcs {
|
||||
// Only need a window if we have an aggregate function
|
||||
stream = stream + "|window().period(period).every(every).align()\n"
|
||||
stream = stream + fmt.Sprintf(`|%s(field).as(metric)`, fnc)
|
||||
value = value + "|window().period(period).every(every).align()\n"
|
||||
value = value + fmt.Sprintf(`|%s(field).as(value)`, fnc)
|
||||
break // only support a single field
|
||||
}
|
||||
break // only support a single field
|
||||
}
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
if value == "" {
|
||||
value = `|eval(lambda: field).as(value)`
|
||||
}
|
||||
return stream + value, nil
|
||||
}
|
||||
|
|
|
@ -44,18 +44,17 @@ func TestData(t *testing.T) {
|
|||
t.Errorf("Error unmarshaling %v", err)
|
||||
}
|
||||
alert := chronograf.AlertRule{
|
||||
Type: "deadman",
|
||||
Query: q,
|
||||
Trigger: "deadman",
|
||||
Query: q,
|
||||
}
|
||||
if tick, err := Data(alert); err != nil {
|
||||
t.Errorf("Error creating tick %v", err)
|
||||
} else {
|
||||
formatted, err := formatTick(tick)
|
||||
_, err := formatTick(tick)
|
||||
if err != nil {
|
||||
fmt.Printf(tick)
|
||||
t.Errorf("Error formatting tick %v", err)
|
||||
}
|
||||
fmt.Printf("%s", formatted)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ func InfluxOut(rule chronograf.AlertRule) string {
|
|||
.database(output_db)
|
||||
.retentionPolicy(output_rp)
|
||||
.measurement(output_mt)
|
||||
.tag('name', name)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`
|
||||
}
|
||||
|
|
|
@ -16,15 +16,15 @@ func TestInfluxOut(t *testing.T) {
|
|||
.database(output_db)
|
||||
.retentionPolicy(output_rp)
|
||||
.measurement(output_mt)
|
||||
.tag('name', name)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got := InfluxOut(chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Type: "deadman",
|
||||
Name: "name",
|
||||
Trigger: "deadman",
|
||||
})
|
||||
formatted, err := formatTick(got)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,78 +1 @@
|
|||
package kapacitor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"text/template"
|
||||
)
|
||||
|
||||
// ThresholdTrigger is the trickscript trigger for alerts that exceed a value
|
||||
var ThresholdTrigger = `
|
||||
var trigger = data|{{ .Aggregate }}(metric)
|
||||
.as('value')
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" {{ .Operator }} crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
// RelativeTrigger compares one window of data versus another.
|
||||
var RelativeTrigger = `
|
||||
var past = data
|
||||
|{{ .Aggregate }}(metric)
|
||||
.as('stat')
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|{{ .Aggregate }}(metric)
|
||||
.as('stat')
|
||||
|
||||
var trigger = past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.stat" - "past.stat"))/float("past.stat"))
|
||||
.keep()
|
||||
.as('value')
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" {{ .Operator }} crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
// DeadmanTrigger checks if any data has been streamed in the last period of time
|
||||
var DeadmanTrigger = `
|
||||
var trigger = data|deadman(threshold, every)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
func execTemplate(tick string, alert interface{}) (string, error) {
|
||||
p := template.New("template")
|
||||
t, err := p.Parse(tick)
|
||||
if err != nil {
|
||||
log.Fatalf("template parse: %s", err)
|
||||
return "", err
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
err = t.Execute(buf, alert)
|
||||
if err != nil {
|
||||
log.Fatalf("template execution: %s", err)
|
||||
return "", err
|
||||
}
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
|
|
@ -9,17 +9,19 @@ import (
|
|||
|
||||
func TestGenerate(t *testing.T) {
|
||||
alert := chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Version: "1.0",
|
||||
Trigger: "relative",
|
||||
AlertServices: []string{"slack", "victorOps", "email"},
|
||||
Type: "stream",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Period: "10m",
|
||||
Every: "30s",
|
||||
Critical: "90",
|
||||
Shift: "1m",
|
||||
Name: "name",
|
||||
Trigger: "relative",
|
||||
Alerts: []string{"slack", "victorOps", "email"},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Relative: &chronograf.RelativeValue{
|
||||
Change: "change",
|
||||
Period: "10m",
|
||||
Shift: "1m",
|
||||
Operator: "greater than",
|
||||
Value: "90",
|
||||
},
|
||||
},
|
||||
Every: "30s",
|
||||
Query: chronograf.QueryConfig{
|
||||
Database: "telegraf",
|
||||
Measurement: "cpu",
|
||||
|
@ -63,18 +65,20 @@ func TestGenerate(t *testing.T) {
|
|||
|
||||
func TestThreshold(t *testing.T) {
|
||||
alert := chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Version: "1.0",
|
||||
Trigger: "threshold",
|
||||
AlertServices: []string{"slack", "victorOps", "email"},
|
||||
Type: "stream",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Period: "10m",
|
||||
Every: "30s",
|
||||
Critical: "90",
|
||||
Shift: "1m",
|
||||
Message: "message",
|
||||
Name: "name",
|
||||
Trigger: "threshold",
|
||||
Alerts: []string{"slack", "victorOps", "email"},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Threshold: &chronograf.ThresholdValue{
|
||||
Relation: "once",
|
||||
Period: "10m",
|
||||
Percentile: "", // TODO: if relation is not once then this will have a number
|
||||
Operator: "greater than",
|
||||
Value: "90",
|
||||
},
|
||||
},
|
||||
Every: "30s",
|
||||
Message: "message",
|
||||
Query: chronograf.QueryConfig{
|
||||
Database: "telegraf",
|
||||
Measurement: "cpu",
|
||||
|
@ -130,8 +134,6 @@ var groupby = ['host', 'cluster_id']
|
|||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
||||
var period = 10m
|
||||
|
||||
var every = 30s
|
||||
|
||||
var name = 'name'
|
||||
|
@ -142,13 +144,13 @@ var message = 'message'
|
|||
|
||||
var idtag = 'alertID'
|
||||
|
||||
var levelfield = 'level'
|
||||
var leveltag = 'level'
|
||||
|
||||
var messagefield = 'message'
|
||||
|
||||
var durationfield = 'duration'
|
||||
|
||||
var metric = 'metric'
|
||||
var value = 'value'
|
||||
|
||||
var output_db = 'chronograf'
|
||||
|
||||
|
@ -158,6 +160,8 @@ var output_mt = 'alerts'
|
|||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var period = 10m
|
||||
|
||||
var crit = 90
|
||||
|
||||
var data = stream
|
||||
|
@ -172,18 +176,16 @@ var data = stream
|
|||
.every(every)
|
||||
.align()
|
||||
|mean(field)
|
||||
.as(metric)
|
||||
.as(value)
|
||||
|
||||
var trigger = data
|
||||
|mean(metric)
|
||||
.as('value')
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" > crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
.slack()
|
||||
|
@ -196,7 +198,7 @@ trigger
|
|||
.database(output_db)
|
||||
.retentionPolicy(output_rp)
|
||||
.measurement(output_mt)
|
||||
.tag('name', name)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`,
|
||||
wantErr: false,
|
||||
|
@ -210,6 +212,7 @@ trigger
|
|||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
fmt.Printf("%s", got)
|
||||
t.Errorf("%q. Threshold() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
|
@ -217,18 +220,20 @@ trigger
|
|||
|
||||
func TestRelative(t *testing.T) {
|
||||
alert := chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Version: "1.0",
|
||||
Trigger: "relative",
|
||||
AlertServices: []string{"slack", "victorOps", "email"},
|
||||
Type: "stream",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Period: "10m",
|
||||
Every: "30s",
|
||||
Critical: "90",
|
||||
Shift: "1m",
|
||||
Message: "message",
|
||||
Name: "name",
|
||||
Trigger: "relative",
|
||||
Alerts: []string{"slack", "victorOps", "email"},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Relative: &chronograf.RelativeValue{
|
||||
Change: "change",
|
||||
Period: "10m",
|
||||
Shift: "1m",
|
||||
Operator: "greater than",
|
||||
Value: "90",
|
||||
},
|
||||
},
|
||||
Every: "30s",
|
||||
Message: "message",
|
||||
Query: chronograf.QueryConfig{
|
||||
Database: "telegraf",
|
||||
Measurement: "cpu",
|
||||
|
@ -284,8 +289,6 @@ var groupby = ['host', 'cluster_id']
|
|||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
||||
var period = 10m
|
||||
|
||||
var every = 30s
|
||||
|
||||
var name = 'name'
|
||||
|
@ -296,13 +299,13 @@ var message = 'message'
|
|||
|
||||
var idtag = 'alertID'
|
||||
|
||||
var levelfield = 'level'
|
||||
var leveltag = 'level'
|
||||
|
||||
var messagefield = 'message'
|
||||
|
||||
var durationfield = 'duration'
|
||||
|
||||
var metric = 'metric'
|
||||
var value = 'value'
|
||||
|
||||
var output_db = 'chronograf'
|
||||
|
||||
|
@ -312,6 +315,8 @@ var output_mt = 'alerts'
|
|||
|
||||
var triggerType = 'relative'
|
||||
|
||||
var period = 10m
|
||||
|
||||
var shift = -1m
|
||||
|
||||
var crit = 90
|
||||
|
@ -328,21 +333,17 @@ var data = stream
|
|||
.every(every)
|
||||
.align()
|
||||
|mean(field)
|
||||
.as(metric)
|
||||
.as(value)
|
||||
|
||||
var past = data
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|
||||
var trigger = past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.stat" - "past.stat")) / float("past.stat"))
|
||||
|eval(lambda: abs(float("current.value" - "past.value")) / float("past.value"))
|
||||
.keep()
|
||||
.as('value')
|
||||
|alert()
|
||||
|
@ -351,7 +352,7 @@ var trigger = past
|
|||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
.slack()
|
||||
|
@ -364,7 +365,7 @@ trigger
|
|||
.database(output_db)
|
||||
.retentionPolicy(output_rp)
|
||||
.measurement(output_mt)
|
||||
.tag('name', name)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`,
|
||||
wantErr: false,
|
||||
|
@ -378,6 +379,7 @@ trigger
|
|||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
fmt.Printf("%s", got)
|
||||
t.Errorf("%q. Relative() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
|
@ -385,18 +387,16 @@ trigger
|
|||
|
||||
func TestDeadman(t *testing.T) {
|
||||
alert := chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Version: "1.0",
|
||||
Trigger: "deadman",
|
||||
AlertServices: []string{"slack", "victorOps", "email"},
|
||||
Type: "stream",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Period: "10m",
|
||||
Every: "30s",
|
||||
Critical: "90",
|
||||
Shift: "1m",
|
||||
Message: "message",
|
||||
Name: "name",
|
||||
Trigger: "deadman",
|
||||
Alerts: []string{"slack", "victorOps", "email"},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Deadman: &chronograf.DeadmanValue{
|
||||
Period: "10m",
|
||||
},
|
||||
},
|
||||
Every: "30s",
|
||||
Message: "message",
|
||||
Query: chronograf.QueryConfig{
|
||||
Database: "telegraf",
|
||||
Measurement: "cpu",
|
||||
|
@ -452,8 +452,6 @@ var groupby = ['host', 'cluster_id']
|
|||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
||||
var period = 10m
|
||||
|
||||
var every = 30s
|
||||
|
||||
var name = 'name'
|
||||
|
@ -464,13 +462,13 @@ var message = 'message'
|
|||
|
||||
var idtag = 'alertID'
|
||||
|
||||
var levelfield = 'level'
|
||||
var leveltag = 'level'
|
||||
|
||||
var messagefield = 'message'
|
||||
|
||||
var durationfield = 'duration'
|
||||
|
||||
var metric = 'usage_user'
|
||||
var value = 'value'
|
||||
|
||||
var output_db = 'chronograf'
|
||||
|
||||
|
@ -482,6 +480,8 @@ var triggerType = 'deadman'
|
|||
|
||||
var threshold = 0.0
|
||||
|
||||
var period = 10m
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
|
@ -489,6 +489,8 @@ var data = stream
|
|||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
.where(where_filter)
|
||||
|eval(lambda: field)
|
||||
.as(value)
|
||||
|
||||
var trigger = data
|
||||
|deadman(threshold, every)
|
||||
|
@ -496,7 +498,7 @@ var trigger = data
|
|||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
.slack()
|
||||
|
@ -509,7 +511,7 @@ trigger
|
|||
.database(output_db)
|
||||
.retentionPolicy(output_rp)
|
||||
.measurement(output_mt)
|
||||
.tag('name', name)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`,
|
||||
wantErr: false,
|
||||
|
@ -523,6 +525,7 @@ trigger
|
|||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
fmt.Printf("%s", got)
|
||||
t.Errorf("%q. Deadman() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,16 +3,94 @@ package kapacitor
|
|||
import "github.com/influxdata/chronograf"
|
||||
import "fmt"
|
||||
|
||||
// ThresholdTrigger is the trickscript trigger for alerts that exceed a value
|
||||
var ThresholdTrigger = `
|
||||
var trigger = data
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" %s crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
// RelativeTrigger compares one window of data versus another.
|
||||
var RelativeTrigger = `
|
||||
var past = data
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|
||||
var trigger = past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.value" - "past.value"))/float("past.value"))
|
||||
.keep()
|
||||
.as('value')
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" %s crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
// DeadmanTrigger checks if any data has been streamed in the last period of time
|
||||
var DeadmanTrigger = `
|
||||
var trigger = data|deadman(threshold, every)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`
|
||||
|
||||
// Trigger returns the trigger mechanism for a tickscript
|
||||
func Trigger(rule chronograf.AlertRule) (string, error) {
|
||||
switch rule.Trigger {
|
||||
case "deadman":
|
||||
return DeadmanTrigger, nil
|
||||
case "relative":
|
||||
return execTemplate(RelativeTrigger, rule)
|
||||
op, err := kapaOperator(rule.TriggerValues.Relative.Operator)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf(RelativeTrigger, op), nil
|
||||
case "threshold":
|
||||
return execTemplate(ThresholdTrigger, rule)
|
||||
op, err := kapaOperator(rule.TriggerValues.Threshold.Operator)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf(ThresholdTrigger, op), nil
|
||||
default:
|
||||
return "", fmt.Errorf("Unknown trigger type: %s", rule.Trigger)
|
||||
}
|
||||
}
|
||||
|
||||
// kapaOperator converts UI strings to kapacitor operators
|
||||
func kapaOperator(operator string) (string, error) {
|
||||
switch operator {
|
||||
case "greater than":
|
||||
return ">", nil
|
||||
case "less than":
|
||||
return "<", nil
|
||||
case "equal to or less than":
|
||||
return "<=", nil
|
||||
case "equal to or greater than":
|
||||
return ">=", nil
|
||||
case "equal":
|
||||
return "==", nil
|
||||
case "not equal":
|
||||
return "!=", nil
|
||||
default:
|
||||
return "", fmt.Errorf("invalid operator: %s is unknown", operator)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,9 +16,7 @@ func TestTrigger(t *testing.T) {
|
|||
{
|
||||
name: "Test Deadman",
|
||||
rule: chronograf.AlertRule{
|
||||
Trigger: "deadman",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Trigger: "deadman",
|
||||
},
|
||||
want: `var trigger = data
|
||||
|deadman(threshold, every)
|
||||
|
@ -26,7 +24,7 @@ func TestTrigger(t *testing.T) {
|
|||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`,
|
||||
|
@ -35,23 +33,22 @@ func TestTrigger(t *testing.T) {
|
|||
{
|
||||
name: "Test Relative",
|
||||
rule: chronograf.AlertRule{
|
||||
Trigger: "relative",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
Trigger: "relative",
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Relative: &chronograf.RelativeValue{
|
||||
Operator: "greater than",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: `var past = data
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|
||||
var trigger = past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.stat" - "past.stat")) / float("past.stat"))
|
||||
|eval(lambda: abs(float("current.value" - "past.value")) / float("past.value"))
|
||||
.keep()
|
||||
.as('value')
|
||||
|alert()
|
||||
|
@ -60,7 +57,7 @@ var trigger = past
|
|||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`,
|
||||
|
@ -69,20 +66,21 @@ var trigger = past
|
|||
{
|
||||
name: "Test Threshold",
|
||||
rule: chronograf.AlertRule{
|
||||
Trigger: "threshold",
|
||||
Operator: ">",
|
||||
Aggregate: "median",
|
||||
Trigger: "threshold",
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Threshold: &chronograf.ThresholdValue{
|
||||
Operator: "greater than",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: `var trigger = data
|
||||
|median(metric)
|
||||
.as('value')
|
||||
|alert()
|
||||
.stateChangesOnly()
|
||||
.crit(lambda: "value" > crit)
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idtag)
|
||||
.levelField(levelfield)
|
||||
.levelTag(leveltag)
|
||||
.messageField(messagefield)
|
||||
.durationField(durationfield)
|
||||
`,
|
||||
|
@ -91,7 +89,7 @@ var trigger = past
|
|||
{
|
||||
name: "Test Invalid",
|
||||
rule: chronograf.AlertRule{
|
||||
Type: "invalid",
|
||||
Trigger: "invalid",
|
||||
},
|
||||
want: ``,
|
||||
wantErr: true,
|
||||
|
|
|
@ -17,8 +17,8 @@ var (
|
|||
Measurement = "alerts"
|
||||
// IDTag is the output tag key for the ID of the alert
|
||||
IDTag = "alertID"
|
||||
//LevelField is the output field key for the alert level information
|
||||
LevelField = "level"
|
||||
//LevelTag is the output tag key for the alert level information
|
||||
LevelTag = "level"
|
||||
// MessageField is the output field key for the message in the alert
|
||||
MessageField = "message"
|
||||
// DurationField is the output field key for the duration of the alert
|
||||
|
@ -36,31 +36,36 @@ func Vars(rule chronograf.AlertRule) (string, error) {
|
|||
case "threshold":
|
||||
vars := `
|
||||
%s
|
||||
var period = %s
|
||||
var crit = %s
|
||||
`
|
||||
return fmt.Sprintf(vars,
|
||||
common,
|
||||
rule.Critical,
|
||||
), nil
|
||||
rule.TriggerValues.Threshold.Period,
|
||||
rule.TriggerValues.Threshold.Value), nil
|
||||
case "relative":
|
||||
vars := `
|
||||
%s
|
||||
var period = %s
|
||||
var shift = -%s
|
||||
var crit = %s
|
||||
`
|
||||
return fmt.Sprintf(vars,
|
||||
common,
|
||||
rule.Shift,
|
||||
rule.Critical,
|
||||
rule.TriggerValues.Relative.Period,
|
||||
rule.TriggerValues.Relative.Shift,
|
||||
rule.TriggerValues.Relative.Value,
|
||||
), nil
|
||||
case "deadman":
|
||||
vars := `
|
||||
%s
|
||||
var threshold = %s
|
||||
var period = %s
|
||||
`
|
||||
return fmt.Sprintf(vars,
|
||||
common,
|
||||
"0.0", // deadman threshold hardcoded to zero
|
||||
rule.TriggerValues.Deadman.Period,
|
||||
), nil
|
||||
default:
|
||||
return "", fmt.Errorf("Unknown trigger mechanism")
|
||||
|
@ -81,18 +86,17 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
|
|||
var groupby = %s
|
||||
var where_filter = %s
|
||||
|
||||
var period = %s
|
||||
var every = %s
|
||||
|
||||
var name = '%s'
|
||||
var idVar = name + ':{{.Group}}'
|
||||
var message = '%s'
|
||||
var idtag = '%s'
|
||||
var levelfield = '%s'
|
||||
var leveltag = '%s'
|
||||
var messagefield = '%s'
|
||||
var durationfield = '%s'
|
||||
|
||||
var metric = '%s'
|
||||
var value = 'value'
|
||||
|
||||
var output_db = '%s'
|
||||
var output_rp = '%s'
|
||||
|
@ -106,15 +110,13 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
|
|||
fld,
|
||||
groupBy(rule.Query),
|
||||
whereFilter(rule.Query),
|
||||
rule.Period,
|
||||
rule.Every,
|
||||
rule.Name,
|
||||
rule.Message,
|
||||
IDTag,
|
||||
LevelField,
|
||||
LevelTag,
|
||||
MessageField,
|
||||
DurationField,
|
||||
metric(rule),
|
||||
Database,
|
||||
RP,
|
||||
Measurement,
|
||||
|
@ -137,8 +139,8 @@ func field(q chronograf.QueryConfig) (string, error) {
|
|||
return "", fmt.Errorf("No fields set in query")
|
||||
}
|
||||
|
||||
// metric will be metric unless there are no field aggregates. If no aggregates, then it is the field name.
|
||||
func metric(rule chronograf.AlertRule) string {
|
||||
// value will be "value"" unless there are no field aggregates. If no aggregates, then it is the field name.
|
||||
func value(rule chronograf.AlertRule) string {
|
||||
for _, field := range rule.Query.Fields {
|
||||
// Deadman triggers do not need any aggregate functions
|
||||
if field.Field != "" && rule.Trigger == "deadman" {
|
||||
|
@ -147,7 +149,7 @@ func metric(rule chronograf.AlertRule) string {
|
|||
return field.Field
|
||||
}
|
||||
}
|
||||
return "metric"
|
||||
return "value"
|
||||
}
|
||||
|
||||
func whereFilter(q chronograf.QueryConfig) string {
|
||||
|
|
Loading…
Reference in New Issue