Fix tickscript generation of lambda funcs with var fields.
parent
dcc2a2c162
commit
12d22189c8
|
@ -35,18 +35,22 @@ func Data(rule chronograf.AlertRule) (string, error) {
|
|||
// Only need aggregate functions for threshold and relative
|
||||
|
||||
if rule.Trigger != "deadman" {
|
||||
fld, err := field(rule.Query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
value := ""
|
||||
for _, field := range rule.Query.Fields {
|
||||
for _, fnc := range field.Funcs {
|
||||
// Only need a window if we have an aggregate function
|
||||
value = value + "|window().period(period).every(every).align()\n"
|
||||
value = value + fmt.Sprintf(`|%s(field).as('value')`, fnc)
|
||||
value = value + fmt.Sprintf(`|%s('%s').as('value')`, fnc, fld)
|
||||
break // only support a single field
|
||||
}
|
||||
break // only support a single field
|
||||
}
|
||||
if value == "" {
|
||||
value = `|eval(lambda: field).as('value')`
|
||||
value = fmt.Sprintf(`|eval(lambda: '%s').as('value')`, fld)
|
||||
}
|
||||
stream = stream + value
|
||||
}
|
||||
|
|
|
@ -7,12 +7,17 @@ import (
|
|||
)
|
||||
|
||||
// InfluxOut creates a kapacitor influxDBOut node to write alert data to Database, RP, Measurement.
|
||||
func InfluxOut(rule chronograf.AlertRule) string {
|
||||
func InfluxOut(rule chronograf.AlertRule) (string, error) {
|
||||
// For some of the alert, the data needs to be renamed (normalized)
|
||||
// before being sent to influxdb.
|
||||
|
||||
rename := ""
|
||||
if rule.Trigger == "deadman" {
|
||||
rename = `|eval(lambda: field).as('value')`
|
||||
fld, err := field(rule.Query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
rename = fmt.Sprintf(`|eval(lambda: '%s').as('value')`, fld)
|
||||
}
|
||||
return fmt.Sprintf(`
|
||||
trigger
|
||||
|
@ -24,5 +29,5 @@ func InfluxOut(rule chronograf.AlertRule) string {
|
|||
.measurement(output_mt)
|
||||
.tag('alertName', name)
|
||||
.tag('triggerType', triggerType)
|
||||
`, rename)
|
||||
`, rename), nil
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ func TestInfluxOut(t *testing.T) {
|
|||
{
|
||||
name: "Test influxDBOut kapacitor node",
|
||||
want: `trigger
|
||||
|eval(lambda: field)
|
||||
|eval(lambda: 'usage_user')
|
||||
.as('value')
|
||||
|influxDBOut()
|
||||
.create()
|
||||
|
@ -24,10 +24,25 @@ func TestInfluxOut(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got := InfluxOut(chronograf.AlertRule{
|
||||
got, err := InfluxOut(chronograf.AlertRule{
|
||||
Name: "name",
|
||||
Trigger: "deadman",
|
||||
Query: chronograf.QueryConfig{
|
||||
Fields: []struct {
|
||||
Field string `json:"field"`
|
||||
Funcs []string `json:"funcs"`
|
||||
}{
|
||||
{
|
||||
Field: "usage_user",
|
||||
Funcs: []string{"mean"},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("%q. InfluxOut()) error = %v", tt.name, err)
|
||||
continue
|
||||
}
|
||||
formatted, err := formatTick(got)
|
||||
if err != nil {
|
||||
t.Errorf("%q. formatTick() error = %v", tt.name, err)
|
||||
|
|
|
@ -29,7 +29,10 @@ func (a *Alert) Generate(rule chronograf.AlertRule) (chronograf.TICKScript, erro
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
output := InfluxOut(rule)
|
||||
output, err := InfluxOut(rule)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
raw := fmt.Sprintf("%s\n%s\n%s%s\n%s", vars, data, trigger, services, output)
|
||||
tick, err := formatTick(raw)
|
||||
if err != nil {
|
||||
|
|
|
@ -124,8 +124,6 @@ var rp = 'autogen'
|
|||
|
||||
var measurement = 'cpu'
|
||||
|
||||
var field = 'usage_user'
|
||||
|
||||
var groupby = ['host', 'cluster_id']
|
||||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
@ -169,7 +167,7 @@ var data = stream
|
|||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean(field)
|
||||
|mean('usage_user')
|
||||
.as('value')
|
||||
|
||||
var trigger = data
|
||||
|
@ -275,8 +273,6 @@ var rp = 'autogen'
|
|||
|
||||
var measurement = 'cpu'
|
||||
|
||||
var field = 'usage_user'
|
||||
|
||||
var groupby = ['host', 'cluster_id']
|
||||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
@ -322,7 +318,7 @@ var data = stream
|
|||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean(field)
|
||||
|mean('usage_user')
|
||||
.as('value')
|
||||
|
||||
var past = data
|
||||
|
@ -434,8 +430,6 @@ var rp = 'autogen'
|
|||
|
||||
var measurement = 'cpu'
|
||||
|
||||
var field = 'usage_user'
|
||||
|
||||
var groupby = ['host', 'cluster_id']
|
||||
|
||||
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
||||
|
@ -488,7 +482,7 @@ var trigger = data
|
|||
.email()
|
||||
|
||||
trigger
|
||||
|eval(lambda: field)
|
||||
|eval(lambda: 'usage_user')
|
||||
.as('value')
|
||||
|influxDBOut()
|
||||
.create()
|
||||
|
|
|
@ -71,16 +71,10 @@ func Vars(rule chronograf.AlertRule) (string, error) {
|
|||
}
|
||||
|
||||
func commonVars(rule chronograf.AlertRule) (string, error) {
|
||||
fld, err := field(rule.Query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
common := `
|
||||
var db = '%s'
|
||||
var rp = '%s'
|
||||
var measurement = '%s'
|
||||
var field = '%s'
|
||||
var groupby = %s
|
||||
var where_filter = %s
|
||||
var period = %s
|
||||
|
@ -102,7 +96,6 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
|
|||
rule.Query.Database,
|
||||
rule.Query.RetentionPolicy,
|
||||
rule.Query.Measurement,
|
||||
fld,
|
||||
groupBy(rule.Query),
|
||||
whereFilter(rule.Query),
|
||||
rule.TriggerValues.Period,
|
||||
|
|
Loading…
Reference in New Issue