feat(notification/check/threshold): add createEmpty false to aggregateWindow

pull/14881/head
Michael Desa 2019-08-29 18:36:18 -04:00
parent 5d1c4d814b
commit 0da82319a8
No known key found for this signature in database
GPG Key ID: 87002651EC5DFFE6
4 changed files with 34 additions and 40 deletions

View File

@ -408,7 +408,7 @@ func TestService_handleGetCheckQuery(t *testing.T) {
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: `{"flux":"package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"foo\")\n\t|\u003e range(start: -1h)\n\t|\u003e aggregateWindow(every: 1h, fn: mean)\n\noption task = {name: \"hello\", every: 1h}\n\ncheck = {\n\t_check_id: \"020f755c3c082000\",\n\t_check_name: \"hello\",\n\t_check_type: \"threshold\",\n\ttags: {aaa: \"vaaa\", bbb: \"vbbb\"},\n}\nok = (r) =\u003e\n\t(r.usage_user \u003e 10.0)\ninfo = (r) =\u003e\n\t(r.usage_user \u003c 40.0)\nwarn = (r) =\u003e\n\t(r.usage_user \u003c 40.0 and r.usage_user \u003e 10.0)\ncrit = (r) =\u003e\n\t(r.usage_user \u003c 40.0 and r.usage_user \u003e 10.0)\nmessageFn = (r) =\u003e\n\t(\"whoa! {check.yeah}\")\n\ndata\n\t|\u003e v1.fieldsAsCols()\n\t|\u003e monitor.check(\n\t\tdata: check,\n\t\tmessageFn: messageFn,\n\t\tok: ok,\n\t\tinfo: info,\n\t\twarn: warn,\n\t\tcrit: crit,\n\t)"}`,
body: `{"flux":"package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"foo\")\n\t|\u003e range(start: -1h)\n\t|\u003e aggregateWindow(every: 1h, fn: mean, createEmpty: false)\n\noption task = {name: \"hello\", every: 1h}\n\ncheck = {\n\t_check_id: \"020f755c3c082000\",\n\t_check_name: \"hello\",\n\t_check_type: \"threshold\",\n\ttags: {aaa: \"vaaa\", bbb: \"vbbb\"},\n}\nok = (r) =\u003e\n\t(r.usage_user \u003e 10.0)\ninfo = (r) =\u003e\n\t(r.usage_user \u003c 40.0)\nwarn = (r) =\u003e\n\t(r.usage_user \u003c 40.0 and r.usage_user \u003e 10.0)\ncrit = (r) =\u003e\n\t(r.usage_user \u003c 40.0 and r.usage_user \u003e 10.0)\nmessageFn = (r) =\u003e\n\t(\"whoa! {check.yeah}\")\n\ndata\n\t|\u003e v1.fieldsAsCols()\n\t|\u003e monitor.check(\n\t\tdata: check,\n\t\tmessageFn: messageFn,\n\t\tok: ok,\n\t\tinfo: info,\n\t\twarn: warn,\n\t\tcrit: crit,\n\t)"}`,
},
},
}

View File

@ -105,7 +105,7 @@ func multiError(errs []error) error {
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFlux() (string, error) {
p, err := t.GenerateFluxASTReal()
p, err := t.GenerateFluxAST()
if err != nil {
return "", err
}
@ -117,8 +117,10 @@ func (t Threshold) GenerateFlux() (string, error) {
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFluxAST() (*ast.Package, error) {
// TODO(desa): remove this function)
p := parser.ParseSource(t.Query.Text)
replaceDurationsWithEvery(p, t.Every)
removeStopFromRange(p)
addCreateEmptyFalseToAggregateWindow(p)
if errs := ast.GetErrors(p); len(errs) != 0 {
return nil, multiError(errs)
@ -130,15 +132,11 @@ func (t Threshold) GenerateFluxAST() (*ast.Package, error) {
return nil, fmt.Errorf("expect a single file to be returned from query parsing got %d", len(p.Files))
}
if _, err := t.getSelectedField(); err != nil {
return nil, err
}
f := p.Files[0]
f.Body = append(f.Body, t.generateTaskOption())
assignPipelineToData(f)
//replaceDurationsWithEvery(p, t.Every)
//removeStopFromRange(p)
f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody()...)
return p, nil
}
@ -156,6 +154,21 @@ func (t Threshold) getSelectedField() (string, error) {
return "", fmt.Errorf("no field was selected")
}
// TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now.
func addCreateEmptyFalseToAggregateWindow(pkg *ast.Package) {
ast.Visit(pkg, func(n ast.Node) {
if call, ok := n.(*ast.CallExpression); ok {
if id, ok := call.Callee.(*ast.Identifier); ok && id.Name == "aggregateWindow" {
for _, args := range call.Arguments {
if obj, ok := args.(*ast.ObjectExpression); ok {
obj.Properties = append(obj.Properties, flux.Property("createEmpty", flux.Bool(false)))
}
}
}
}
})
}
// TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now.
func replaceDurationsWithEvery(pkg *ast.Package, every *notification.Duration) {
ast.Visit(pkg, func(n ast.Node) {
@ -222,32 +235,6 @@ func assignPipelineToData(f *ast.File) error {
return nil
}
// GenerateFluxASTReal is the real version of GenerateFluxAST. It has to exist so staticheck doesn't yell about
// the unexported functions I have here.
func (t Threshold) GenerateFluxASTReal() (*ast.Package, error) {
p := parser.ParseSource(t.Query.Text)
replaceDurationsWithEvery(p, t.Every)
removeStopFromRange(p)
if errs := ast.GetErrors(p); len(errs) != 0 {
return nil, multiError(errs)
}
// TODO(desa): this is a hack that we had to do as a result of https://github.com/influxdata/flux/issues/1701
// when it is fixed we should use a separate file and not manipulate the existing one.
if len(p.Files) != 1 {
return nil, fmt.Errorf("expect a single file to be returned from query parsing got %d", len(p.Files))
}
f := p.Files[0]
assignPipelineToData(f)
f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody()...)
return p, nil
}
func (t Threshold) generateFluxASTBody() []ast.Statement {
var statements []ast.Statement
statements = append(statements, t.generateTaskOption())

View File

@ -92,7 +92,7 @@ import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
@ -192,7 +192,7 @@ import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
@ -292,7 +292,7 @@ import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
@ -331,7 +331,7 @@ data
t.Run(tt.name, func(t *testing.T) {
// TODO(desa): change this to GenerateFlux() when we don't need to code
// around the monitor package not being available.
p, err := tt.args.threshold.GenerateFluxASTReal()
p, err := tt.args.threshold.GenerateFluxAST()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -115,6 +115,13 @@ func String(s string) *ast.StringLiteral {
}
}
// Bool returns an *ast.BooleanLiteral of b.
func Bool(b bool) *ast.BooleanLiteral {
return &ast.BooleanLiteral{
Value: b,
}
}
// Duration returns an *ast.DurationLiteral for a single duration.
func Duration(m int64, u string) *ast.DurationLiteral {
return &ast.DurationLiteral{