From 12d22189c8e9143da30fd1527ace1e44c0ff407f Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Fri, 4 Nov 2016 16:28:28 -0500 Subject: [PATCH] Fix tickscript generation of lambda funcs with var fields. --- kapacitor/data.go | 8 ++++++-- kapacitor/influxout.go | 11 ++++++++--- kapacitor/influxout_test.go | 19 +++++++++++++++++-- kapacitor/tickscripts.go | 5 ++++- kapacitor/tickscripts_test.go | 12 +++--------- kapacitor/vars.go | 7 ------- 6 files changed, 38 insertions(+), 24 deletions(-) diff --git a/kapacitor/data.go b/kapacitor/data.go index 5c893ca46..71b8588b8 100644 --- a/kapacitor/data.go +++ b/kapacitor/data.go @@ -35,18 +35,22 @@ func Data(rule chronograf.AlertRule) (string, error) { // Only need aggregate functions for threshold and relative if rule.Trigger != "deadman" { + fld, err := field(rule.Query) + if err != nil { + return "", err + } value := "" for _, field := range rule.Query.Fields { for _, fnc := range field.Funcs { // Only need a window if we have an aggregate function value = value + "|window().period(period).every(every).align()\n" - value = value + fmt.Sprintf(`|%s(field).as('value')`, fnc) + value = value + fmt.Sprintf(`|%s('%s').as('value')`, fnc, fld) break // only support a single field } break // only support a single field } if value == "" { - value = `|eval(lambda: field).as('value')` + value = fmt.Sprintf(`|eval(lambda: '%s').as('value')`, fld) } stream = stream + value } diff --git a/kapacitor/influxout.go b/kapacitor/influxout.go index 1825be312..49180a765 100644 --- a/kapacitor/influxout.go +++ b/kapacitor/influxout.go @@ -7,12 +7,17 @@ import ( ) // InfluxOut creates a kapacitor influxDBOut node to write alert data to Database, RP, Measurement. -func InfluxOut(rule chronograf.AlertRule) string { +func InfluxOut(rule chronograf.AlertRule) (string, error) { // For some of the alert, the data needs to be renamed (normalized) // before being sent to influxdb. + rename := "" if rule.Trigger == "deadman" { - rename = `|eval(lambda: field).as('value')` + fld, err := field(rule.Query) + if err != nil { + return "", err + } + rename = fmt.Sprintf(`|eval(lambda: '%s').as('value')`, fld) } return fmt.Sprintf(` trigger @@ -24,5 +29,5 @@ func InfluxOut(rule chronograf.AlertRule) string { .measurement(output_mt) .tag('alertName', name) .tag('triggerType', triggerType) - `, rename) + `, rename), nil } diff --git a/kapacitor/influxout_test.go b/kapacitor/influxout_test.go index 9e83122b1..57cc06c88 100644 --- a/kapacitor/influxout_test.go +++ b/kapacitor/influxout_test.go @@ -11,7 +11,7 @@ func TestInfluxOut(t *testing.T) { { name: "Test influxDBOut kapacitor node", want: `trigger - |eval(lambda: field) + |eval(lambda: 'usage_user') .as('value') |influxDBOut() .create() @@ -24,10 +24,25 @@ func TestInfluxOut(t *testing.T) { }, } for _, tt := range tests { - got := InfluxOut(chronograf.AlertRule{ + got, err := InfluxOut(chronograf.AlertRule{ Name: "name", Trigger: "deadman", + Query: chronograf.QueryConfig{ + Fields: []struct { + Field string `json:"field"` + Funcs []string `json:"funcs"` + }{ + { + Field: "usage_user", + Funcs: []string{"mean"}, + }, + }, + }, }) + if err != nil { + t.Errorf("%q. InfluxOut()) error = %v", tt.name, err) + continue + } formatted, err := formatTick(got) if err != nil { t.Errorf("%q. formatTick() error = %v", tt.name, err) diff --git a/kapacitor/tickscripts.go b/kapacitor/tickscripts.go index b4aa5492e..ee8dff642 100644 --- a/kapacitor/tickscripts.go +++ b/kapacitor/tickscripts.go @@ -29,7 +29,10 @@ func (a *Alert) Generate(rule chronograf.AlertRule) (chronograf.TICKScript, erro if err != nil { return "", err } - output := InfluxOut(rule) + output, err := InfluxOut(rule) + if err != nil { + return "", err + } raw := fmt.Sprintf("%s\n%s\n%s%s\n%s", vars, data, trigger, services, output) tick, err := formatTick(raw) if err != nil { diff --git a/kapacitor/tickscripts_test.go b/kapacitor/tickscripts_test.go index 93a604a87..077d764c3 100644 --- a/kapacitor/tickscripts_test.go +++ b/kapacitor/tickscripts_test.go @@ -124,8 +124,6 @@ var rp = 'autogen' var measurement = 'cpu' -var field = 'usage_user' - var groupby = ['host', 'cluster_id'] var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod') @@ -169,7 +167,7 @@ var data = stream .period(period) .every(every) .align() - |mean(field) + |mean('usage_user') .as('value') var trigger = data @@ -275,8 +273,6 @@ var rp = 'autogen' var measurement = 'cpu' -var field = 'usage_user' - var groupby = ['host', 'cluster_id'] var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod') @@ -322,7 +318,7 @@ var data = stream .period(period) .every(every) .align() - |mean(field) + |mean('usage_user') .as('value') var past = data @@ -434,8 +430,6 @@ var rp = 'autogen' var measurement = 'cpu' -var field = 'usage_user' - var groupby = ['host', 'cluster_id'] var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod') @@ -488,7 +482,7 @@ var trigger = data .email() trigger - |eval(lambda: field) + |eval(lambda: 'usage_user') .as('value') |influxDBOut() .create() diff --git a/kapacitor/vars.go b/kapacitor/vars.go index d258c2d78..aa4d18714 100644 --- a/kapacitor/vars.go +++ b/kapacitor/vars.go @@ -71,16 +71,10 @@ func Vars(rule chronograf.AlertRule) (string, error) { } func commonVars(rule chronograf.AlertRule) (string, error) { - fld, err := field(rule.Query) - if err != nil { - return "", err - } - common := ` var db = '%s' var rp = '%s' var measurement = '%s' - var field = '%s' var groupby = %s var where_filter = %s var period = %s @@ -102,7 +96,6 @@ func commonVars(rule chronograf.AlertRule) (string, error) { rule.Query.Database, rule.Query.RetentionPolicy, rule.Query.Measurement, - fld, groupBy(rule.Query), whereFilter(rule.Query), rule.TriggerValues.Period,