From 2ae2b06c57077976be60b41ea7d0694ea6dd34d7 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Fri, 6 Sep 2019 10:03:33 -0400 Subject: [PATCH] fix(notification/check): remove the aggregateWindow from the deadman query If the user specifies an aggregate window in the query they provide us, we remove it so that we can property detect the deadman. This is not the solution we will want long term, but should be sufficient for now. --- notification/check/deadman.go | 1 + notification/check/deadman_test.go | 59 ++++++++++++++++++++++++++++++ notification/check/threshold.go | 14 +++++++ 3 files changed, 74 insertions(+) diff --git a/notification/check/deadman.go b/notification/check/deadman.go index 9e96e7cad0..e8fe381e80 100644 --- a/notification/check/deadman.go +++ b/notification/check/deadman.go @@ -45,6 +45,7 @@ func (c Deadman) GenerateFlux() (string, error) { // an error for each error found when the script is parsed. func (c Deadman) GenerateFluxAST() (*ast.Package, error) { p := parser.ParseSource(c.Query.Text) + removeAggregateWindow(p) replaceDurationsWithEvery(p, c.StaleTime) removeStopFromRange(p) diff --git a/notification/check/deadman_test.go b/notification/check/deadman_test.go index 71095c8133..e44854c97d 100644 --- a/notification/check/deadman_test.go +++ b/notification/check/deadman_test.go @@ -21,6 +21,65 @@ func TestDeadman_GenerateFlux(t *testing.T) { args args wants wants }{ + { + name: "with aggregateWindow", + args: args{ + deadman: check.Deadman{ + Base: check.Base{ + ID: 10, + Name: "moo", + Tags: []influxdb.Tag{ + {Key: "aaa", Value: "vaaa"}, + {Key: "bbb", Value: "vbbb"}, + }, + Every: mustDuration("1h"), + StatusMessageTemplate: "whoa! {r.dead}", + Query: influxdb.DashboardQuery{ + Text: `from(bucket: "foo") |> range(start: -1d, stop: now()) |> aggregateWindow(fn: mean, every: 1m) |> yield()`, + BuilderConfig: influxdb.BuilderConfig{ + Tags: []struct { + Key string `json:"key"` + Values []string `json:"values"` + }{ + { + Key: "_field", + Values: []string{"usage_user"}, + }, + }, + }, + }, + }, + TimeSince: mustDuration("60s"), + StaleTime: mustDuration("10m"), + Level: notification.Info, + }, + }, + wants: wants{ + script: `package main +import "influxdata/influxdb/monitor" +import "experimental" + +data = from(bucket: "foo") + |> range(start: -10m) + +option task = {name: "moo", every: 1h} + +check = { + _check_id: "000000000000000a", + _check_name: "moo", + _type: "deadman", + tags: {aaa: "vaaa", bbb: "vbbb"}, +} +info = (r) => + (r.dead) +messageFn = (r) => + ("whoa! {r.dead}") + +data + |> monitor.deadman(t: experimental.subDuration(from: now(), d: 60s)) + |> monitor.check(data: check, messageFn: messageFn, info: info)`, + }, + }, { name: "basic", args: args{ diff --git a/notification/check/threshold.go b/notification/check/threshold.go index df1bf862f8..facf5e67a2 100644 --- a/notification/check/threshold.go +++ b/notification/check/threshold.go @@ -208,6 +208,20 @@ func removeStopFromRange(pkg *ast.Package) { }) } +// TODO(desa): we'll likely want to remove all other arguments to range that are provided, but for now this should work. +// When we decide to implement the full feature we'll have to do something more sophisticated. +func removeAggregateWindow(pkg *ast.Package) { + ast.Visit(pkg, func(n ast.Node) { + if pipe, ok := n.(*ast.PipeExpression); ok { + if id, ok := pipe.Call.Callee.(*ast.Identifier); ok && id.Name == "aggregateWindow" { + if subPipe, ok := pipe.Argument.(*ast.PipeExpression); ok { + *pipe = *subPipe + } + } + } + }) +} + func assignPipelineToData(f *ast.File) error { if len(f.Body) != 1 { return fmt.Errorf("expected there to be a single statement in the flux script body, recieved %d", len(f.Body))