Add initial reversing of TICKscript to query config
parent
3107408851
commit
d930961d32
|
@ -0,0 +1,145 @@
|
|||
package kapacitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/kapacitor/pipeline"
|
||||
"github.com/influxdata/kapacitor/tick"
|
||||
"github.com/influxdata/kapacitor/tick/ast"
|
||||
"github.com/influxdata/kapacitor/tick/stateful"
|
||||
)
|
||||
|
||||
// Reverse converts tickscript to an AlertRule
|
||||
func Reverse(script chronograf.TICKScript) (chronograf.AlertRule, error) {
|
||||
rule := chronograf.AlertRule{
|
||||
Alerts: []string{},
|
||||
}
|
||||
scope := stateful.NewScope()
|
||||
template, err := pipeline.CreateTemplatePipeline(string(script), pipeline.StreamEdge, scope, &deadman{})
|
||||
if err != nil {
|
||||
return chronograf.AlertRule{}, err
|
||||
}
|
||||
vars := template.Vars()
|
||||
if err := valueStr("db", &rule.Query.Database, vars); err != nil {
|
||||
return chronograf.AlertRule{}, err
|
||||
}
|
||||
rule.Query.RetentionPolicy = vars["rp"].Value.(string)
|
||||
rule.Query.Measurement = vars["measurement"].Value.(string)
|
||||
rule.Name = vars["name"].Value.(string)
|
||||
rule.Trigger = vars["triggerType"].Value.(string)
|
||||
rule.Every = vars["every"].Value.(time.Duration).String()
|
||||
// Convert to just minutes or hours
|
||||
rule.Query.GroupBy.Time = vars["period"].Value.(time.Duration).String()
|
||||
rule.Message = vars["message"].Value.(string)
|
||||
rule.Details = vars["details"].Value.(string)
|
||||
if v, ok := vars["lower"]; ok {
|
||||
rule.TriggerValues.Value = fmt.Sprintf("%v", v.Value)
|
||||
}
|
||||
if v, ok := vars["upper"]; ok {
|
||||
rule.TriggerValues.RangeValue = fmt.Sprintf("%v", v.Value)
|
||||
}
|
||||
if v, ok := vars["crit"]; ok {
|
||||
rule.TriggerValues.Value = fmt.Sprintf("%v", v.Value)
|
||||
}
|
||||
if v, ok := vars["groupBy"]; ok {
|
||||
groups := v.Value.([]tick.Var)
|
||||
rule.Query.GroupBy.Tags = make([]string, len(groups))
|
||||
for i, g := range groups {
|
||||
rule.Query.GroupBy.Tags[i] = g.Value.(string)
|
||||
}
|
||||
}
|
||||
if v, ok := vars["whereFilter"]; ok {
|
||||
rule.Query.Tags = make(map[string][]string)
|
||||
value := v.Value.(*ast.LambdaNode)
|
||||
var re = regexp.MustCompile(`(?U)"(.*)"\s+(==|!=)\s+'(.*)'`)
|
||||
for _, match := range re.FindAllStringSubmatch(value.ExpressionString(), -1) {
|
||||
if match[2] == "==" {
|
||||
rule.Query.AreTagsAccepted = true
|
||||
}
|
||||
tag, value := match[1], match[3]
|
||||
values, ok := rule.Query.Tags[tag]
|
||||
if !ok {
|
||||
values = []string{}
|
||||
}
|
||||
values = append(values, value)
|
||||
rule.Query.Tags[tag] = values
|
||||
}
|
||||
}
|
||||
|
||||
// Only if non-deadman
|
||||
var re = regexp.MustCompile(`(?Um)\|(\w+)\('(.*)'\)\s+\.as\(\'.*\'\)`)
|
||||
for _, match := range re.FindAllStringSubmatch(string(script), -1) {
|
||||
fn, field := match[1], match[2]
|
||||
rule.Query.Fields = []chronograf.Field{
|
||||
chronograf.Field{
|
||||
Field: field,
|
||||
Funcs: []string{
|
||||
fn,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
p, err := pipeline.CreatePipeline(string(script), pipeline.StreamEdge, stateful.NewScope(), &deadman{}, vars)
|
||||
if err != nil {
|
||||
return chronograf.AlertRule{}, err
|
||||
}
|
||||
|
||||
p.Walk(func(n pipeline.Node) error {
|
||||
switch t := n.(type) {
|
||||
case *pipeline.AlertNode:
|
||||
bin, ok := t.Crit.Expression.(*ast.BinaryNode)
|
||||
if ok {
|
||||
oper := bin.Operator
|
||||
if oper == ast.TokenAnd || oper == ast.TokenOr {
|
||||
lhs, lok := bin.Left.(*ast.BinaryNode)
|
||||
rhs, rok := bin.Right.(*ast.BinaryNode)
|
||||
if rok && lok {
|
||||
op, err := chronoRangeOperators([]string{
|
||||
lhs.String(),
|
||||
oper.String(),
|
||||
rhs.String(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rule.TriggerValues.Operator = op
|
||||
}
|
||||
} else {
|
||||
op, err := chronoOperator(bin.Operator.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rule.TriggerValues.Operator = op
|
||||
}
|
||||
}
|
||||
if t.VictorOps() != nil {
|
||||
rule.Alerts = append(rule.Alerts, "victorops")
|
||||
}
|
||||
if t.Slack() != nil {
|
||||
rule.Alerts = append(rule.Alerts, "slack")
|
||||
}
|
||||
if t.Email() != nil {
|
||||
rule.Alerts = append(rule.Alerts, "email")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return rule, err
|
||||
}
|
||||
|
||||
func valueStr(key string, value *string, vars map[string]tick.Var) error {
|
||||
v, ok := vars[key]
|
||||
if !ok {
|
||||
return fmt.Errorf("No %s", key)
|
||||
}
|
||||
val, ok := v.Value.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("No %s", key)
|
||||
}
|
||||
value = &val
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,197 @@
|
|||
package kapacitor
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
func TestReverse(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
script chronograf.TICKScript
|
||||
want chronograf.AlertRule
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "simple stream tickscript",
|
||||
script: chronograf.TICKScript(`
|
||||
var name = 'name'
|
||||
var triggerType = 'threshold'
|
||||
var every = 30s
|
||||
var period = 10m
|
||||
var groupBy = ['host', 'cluster_id']
|
||||
var db = 'telegraf'
|
||||
var rp = 'autogen'
|
||||
var measurement = 'cpu'
|
||||
var message = 'message'
|
||||
var details = 'details'
|
||||
var crit = 90
|
||||
var idVar = name + ':{{.Group}}'
|
||||
var idTag = 'alertID'
|
||||
var levelTag = 'level'
|
||||
var messageField = 'message'
|
||||
var durationField = 'duration'
|
||||
var whereFilter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean('usage_user')
|
||||
.as('value')
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
.slack()
|
||||
.victorOps()
|
||||
.email()
|
||||
`),
|
||||
|
||||
want: chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Trigger: "threshold",
|
||||
Alerts: []string{"victorops", "slack", "email"},
|
||||
TriggerValues: chronograf.TriggerValues{
|
||||
Operator: "greater than",
|
||||
Value: "90",
|
||||
},
|
||||
Every: "30s",
|
||||
Message: "message",
|
||||
Details: "details",
|
||||
Query: chronograf.QueryConfig{
|
||||
Database: "telegraf",
|
||||
RetentionPolicy: "autogen",
|
||||
Measurement: "cpu",
|
||||
Fields: []chronograf.Field{
|
||||
{
|
||||
Field: "usage_user",
|
||||
Funcs: []string{
|
||||
"mean",
|
||||
},
|
||||
},
|
||||
},
|
||||
GroupBy: chronograf.GroupBy{
|
||||
Time: "10m0s",
|
||||
Tags: []string{"host", "cluster_id"},
|
||||
},
|
||||
Tags: map[string][]string{
|
||||
"cpu": []string{
|
||||
"cpu_total",
|
||||
},
|
||||
"host": []string{
|
||||
"acc-0eabc309-eu-west-1-data-3",
|
||||
"prod",
|
||||
},
|
||||
},
|
||||
AreTagsAccepted: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test Threshold",
|
||||
script: `var db = 'telegraf'
|
||||
|
||||
var rp = 'autogen'
|
||||
|
||||
var measurement = 'cpu'
|
||||
|
||||
var groupBy = ['host', 'cluster_id']
|
||||
|
||||
var whereFilter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
||||
var period = 10m
|
||||
|
||||
var every = 30s
|
||||
|
||||
var name = 'name'
|
||||
|
||||
var idVar = name + ':{{.Group}}'
|
||||
|
||||
var message = 'message'
|
||||
|
||||
var idTag = 'alertID'
|
||||
|
||||
var levelTag = 'level'
|
||||
|
||||
var messageField = 'message'
|
||||
|
||||
var durationField = 'duration'
|
||||
|
||||
var outputDB = 'chronograf'
|
||||
|
||||
var outputRP = 'autogen'
|
||||
|
||||
var outputMeasurement = 'alerts'
|
||||
|
||||
var triggerType = 'threshold'
|
||||
|
||||
var crit = 90
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(db)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupBy)
|
||||
.where(whereFilter)
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean('usage_user')
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|alert()
|
||||
.crit(lambda: "value" > crit)
|
||||
.stateChangesOnly()
|
||||
.message(message)
|
||||
.id(idVar)
|
||||
.idTag(idTag)
|
||||
.levelTag(levelTag)
|
||||
.messageField(messageField)
|
||||
.durationField(durationField)
|
||||
.slack()
|
||||
.victorOps()
|
||||
.email()
|
||||
|
||||
trigger
|
||||
|influxDBOut()
|
||||
.create()
|
||||
.database(outputDB)
|
||||
.retentionPolicy(outputRP)
|
||||
.measurement(outputMeasurement)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
|
||||
trigger
|
||||
|httpOut('output')`,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := Reverse(tt.script)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Reverse() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Reverse() = \n%#v\n, want \n%#v\n", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -33,6 +33,25 @@ func kapaOperator(operator string) (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func chronoOperator(operator string) (string, error) {
|
||||
switch operator {
|
||||
case ">":
|
||||
return greaterThan, nil
|
||||
case "<":
|
||||
return lessThan, nil
|
||||
case "<=":
|
||||
return LessThanEqual, nil
|
||||
case ">=":
|
||||
return GreaterThanEqual, nil
|
||||
case "==":
|
||||
return Equal, nil
|
||||
case "!=":
|
||||
return NotEqual, nil
|
||||
default:
|
||||
return "", fmt.Errorf("invalid operator: %s is unknown", operator)
|
||||
}
|
||||
}
|
||||
|
||||
func rangeOperators(operator string) ([]string, error) {
|
||||
switch operator {
|
||||
case InsideRange:
|
||||
|
@ -43,3 +62,15 @@ func rangeOperators(operator string) ([]string, error) {
|
|||
return nil, fmt.Errorf("invalid operator: %s is unknown", operator)
|
||||
}
|
||||
}
|
||||
|
||||
func chronoRangeOperators(ops []string) (string, error) {
|
||||
if len(ops) != 3 {
|
||||
return "", fmt.Errorf("Unknown operators")
|
||||
}
|
||||
if ops[0] == ">=" && ops[1] == "AND" && ops[2] == "<=" {
|
||||
return InsideRange, nil
|
||||
} else if ops[0] == "<" && ops[1] == "OR" && ops[2] == ">" {
|
||||
return OutsideRange, nil
|
||||
}
|
||||
return "", fmt.Errorf("Unknown operators")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue