fix: switch flux formatter to one that preserves comments (#22186)

pull/22192/head
Daniel Moran 2021-08-12 14:17:18 -04:00 committed by GitHub
parent a160a1d47c
commit 07d897d2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 426 additions and 779 deletions

View File

@ -78,6 +78,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21950](https://github.com/influxdata/influxdb/pull/21950): Invalid requests to /api/v2 subroutes now return 404 instead of a list of links.
1. [21962](https://github.com/influxdata/influxdb/pull/21962): Flux metaqueries for `_field` take fast path if `_measurement` is the only predicate.
1. [22059](https://github.com/influxdata/influxdb/pull/22059): Copy names from mmapped memory before closing iterator
1. [22186](https://github.com/influxdata/influxdb/pull/22186): Preserve comments in flux queries when saving task definitions
## v2.0.7 [2021-06-04]

View File

@ -317,7 +317,7 @@ func CreateCheck(
Organization: "theorg",
OwnerID: MustIDBase16("020f755c3c082001"),
Status: "active",
Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"experimental\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1h)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082000\",\n\t_check_name: \"name1\",\n\t_type: \"deadman\",\n\ttags: {k1: \"v1\", k2: \"v2\"},\n}\ncrit = (r) =>\n\t(r[\"dead\"])\nmessageFn = (r) =>\n\t(\"msg1\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"deadman\"](t: experimental[\"subDuration\"](from: now(), d: 21s))\n\t|> monitor[\"check\"](data: check, messageFn: messageFn, crit: crit)",
Flux: "import \"influxdata/influxdb/monitor\"\nimport \"experimental\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1h) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082000\", _check_name: \"name1\", _type: \"deadman\", tags: {k1: \"v1\", k2: \"v2\"}}\ncrit = (r) => r[\"dead\"]\nmessageFn = (r) => \"msg1\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"deadman\"](t: experimental[\"subDuration\"](from: now(), d: 21s))\n |> monitor[\"check\"](data: check, messageFn: messageFn, crit: crit)",
Every: "1m",
},
},
@ -447,7 +447,7 @@ func CreateCheck(
OwnerID: MustIDBase16("020f755c3c082005"),
Status: "active",
Every: "1m",
Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1m)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name2\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082001\",\n\t_check_name: \"name2\",\n\t_type: \"threshold\",\n\ttags: {k11: \"v11\"},\n}\nok = (r) =>\n\t(r[\"usage_user\"] < 1000.0)\nwarn = (r) =>\n\t(r[\"usage_user\"] > 2000.0)\ninfo = (r) =>\n\t(r[\"usage_user\"] < 1900.0 and r[\"usage_user\"] > 1500.0)\nmessageFn = (r) =>\n\t(\"msg2\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"check\"](\n\t\tdata: check,\n\t\tmessageFn: messageFn,\n\t\tok: ok,\n\t\twarn: warn,\n\t\tinfo: info,\n\t)",
Flux: "import \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1m) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name2\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082001\", _check_name: \"name2\", _type: \"threshold\", tags: {k11: \"v11\"}}\nok = (r) => r[\"usage_user\"] < 1000.0\nwarn = (r) => r[\"usage_user\"] > 2000.0\ninfo = (r) => r[\"usage_user\"] < 1900.0 and r[\"usage_user\"] > 1500.0\nmessageFn = (r) => \"msg2\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"check\"](\n data: check,\n messageFn: messageFn,\n ok: ok,\n warn: warn,\n info: info,\n)",
},
},
},
@ -584,7 +584,7 @@ func CreateCheck(
OwnerID: MustIDBase16("020f755c3c082001"),
Status: "active",
Every: "1m",
Flux: "package main\nimport \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\")\n\t|> range(start: -1m)\n\t|> filter(fn: (r) =>\n\t\t(r._field == \"usage_user\"))\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {\n\t_check_id: \"020f755c3c082001\",\n\t_check_name: \"name1\",\n\t_type: \"threshold\",\n\ttags: {k11: \"v11\", k22: \"v22\"},\n}\nmessageFn = (r) =>\n\t(\"msg2\")\n\ndata\n\t|> v1[\"fieldsAsCols\"]()\n\t|> monitor[\"check\"](data: check, messageFn: messageFn)",
Flux: "import \"influxdata/influxdb/monitor\"\nimport \"influxdata/influxdb/v1\"\n\ndata = from(bucket: \"telegraf\") |> range(start: -1m) |> filter(fn: (r) => r._field == \"usage_user\")\n\noption task = {name: \"name1\", every: 1m}\n\ncheck = {_check_id: \"020f755c3c082001\", _check_name: \"name1\", _type: \"threshold\", tags: {k11: \"v11\", k22: \"v22\"}}\nmessageFn = (r) => \"msg2\"\n\ndata |> v1[\"fieldsAsCols\"]() |> monitor[\"check\"](data: check, messageFn: messageFn)",
},
},
checks: []influxdb.Check{

View File

@ -3832,15 +3832,12 @@ spec:
expectedQuery := expectedParams + `
from(bucket: params.bucket)
|> range(start: params.start, end: params.stop)
|> filter(fn: (r) =>
(r._measurement == "processes"))
|> filter(fn: (r) =>
(r.floater == params.floatVal))
|> filter(fn: (r) =>
(r._value > params.minVal))
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
|> range(start: params.start, end: params.stop)
|> filter(fn: (r) => r._measurement == "processes")
|> filter(fn: (r) => r.floater == params.floatVal)
|> filter(fn: (r) => r._value > params.minVal)
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
assert.Equal(t, expectedQuery, props.Queries[0].Text)
assert.Equal(t, "advanced", props.Queries[0].EditMode)
@ -3869,12 +3866,12 @@ from(bucket: params.bucket)
actual := impact.Summary.Dashboards[0]
expectedParams := `option params = {
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
}`
isExpectedQuery(t, actual, expectedParams)
@ -3955,12 +3952,12 @@ from(bucket: params.bucket)
actual := impact.Summary.Dashboards[0]
expectedParams := `option params = {
bucket: "foobar",
start: -5d,
stop: now(),
name: "min",
floatVal: 33.3,
minVal: 3,
bucket: "foobar",
start: -5d,
stop: now(),
name: "min",
floatVal: 33.3,
minVal: 3,
}`
isExpectedQuery(t, actual, expectedParams)
@ -4084,15 +4081,12 @@ spec:
expectedQuery := expectedParams + `
from(bucket: params.bucket)
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) =>
(r._measurement == "processes"))
|> filter(fn: (r) =>
(r.floater == params.floatVal))
|> filter(fn: (r) =>
(r._value > params.minVal))
|> aggregateWindow(every: 1m, fn: max)
|> yield(name: params.name)`
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) => r._measurement == "processes")
|> filter(fn: (r) => r.floater == params.floatVal)
|> filter(fn: (r) => r._value > params.minVal)
|> aggregateWindow(every: 1m, fn: max)
|> yield(name: params.name)`
assert.Equal(t, expectedQuery, actual.Query)
}
@ -4120,12 +4114,12 @@ from(bucket: params.bucket)
actual := impact.Summary.Tasks[0]
expectedParams := `option params = {
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
}`
isExpectedQuery(t, actual, expectedParams)
@ -4206,12 +4200,12 @@ from(bucket: params.bucket)
actual := impact.Summary.Tasks[0]
expectedParams := `option params = {
bucket: "foobar",
start: -5d,
stop: now(),
name: "min",
floatVal: 33.3,
minVal: 3,
bucket: "foobar",
start: -5d,
stop: now(),
name: "min",
floatVal: 33.3,
minVal: 3,
}`
isExpectedQuery(t, actual, expectedParams)

View File

@ -418,7 +418,7 @@ func TestService_handleGetCheckQuery(t *testing.T) {
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: "{\"flux\":\"package main\\nimport \\\"influxdata/influxdb/monitor\\\"\\nimport \\\"influxdata/influxdb/v1\\\"\\n\\ndata = from(bucket: \\\"foo\\\")\\n\\t|\\u003e range(start: -1h)\\n\\t|\\u003e filter(fn: (r) =\\u003e\\n\\t\\t(r._field == \\\"usage_idle\\\"))\\n\\t|\\u003e aggregateWindow(every: 1h, fn: mean, createEmpty: false)\\n\\noption task = {name: \\\"hello\\\", every: 1h}\\n\\ncheck = {\\n\\t_check_id: \\\"020f755c3c082000\\\",\\n\\t_check_name: \\\"hello\\\",\\n\\t_type: \\\"threshold\\\",\\n\\ttags: {aaa: \\\"vaaa\\\", bbb: \\\"vbbb\\\"},\\n}\\nok = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003e 10.0)\\ninfo = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0)\\nwarn = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0)\\ncrit = (r) =\\u003e\\n\\t(r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0)\\nmessageFn = (r) =\\u003e\\n\\t(\\\"whoa! {check.yeah}\\\")\\n\\ndata\\n\\t|\\u003e v1[\\\"fieldsAsCols\\\"]()\\n\\t|\\u003e monitor[\\\"check\\\"](\\n\\t\\tdata: check,\\n\\t\\tmessageFn: messageFn,\\n\\t\\tok: ok,\\n\\t\\tinfo: info,\\n\\t\\twarn: warn,\\n\\t\\tcrit: crit,\\n\\t)\"}\n",
body: "{\"flux\":\"import \\\"influxdata/influxdb/monitor\\\"\\nimport \\\"influxdata/influxdb/v1\\\"\\n\\ndata = from(bucket: \\\"foo\\\") |\\u003e range(start: -1h) |\\u003e filter(fn: (r) =\\u003e r._field == \\\"usage_idle\\\")\\n |\\u003e aggregateWindow(every: 1h, fn: mean, createEmpty: false)\\n\\noption task = {name: \\\"hello\\\", every: 1h}\\n\\ncheck = {_check_id: \\\"020f755c3c082000\\\", _check_name: \\\"hello\\\", _type: \\\"threshold\\\", tags: {aaa: \\\"vaaa\\\", bbb: \\\"vbbb\\\"}}\\nok = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003e 10.0\\ninfo = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0\\nwarn = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0\\ncrit = (r) =\\u003e r[\\\"usage_idle\\\"] \\u003c 40.0 and r[\\\"usage_idle\\\"] \\u003e 10.0\\nmessageFn = (r) =\\u003e \\\"whoa! {check.yeah}\\\"\\n\\ndata |\\u003e v1[\\\"fieldsAsCols\\\"]() |\\u003e monitor[\\\"check\\\"](\\n data: check,\\n messageFn: messageFn,\\n ok: ok,\\n info: info,\\n warn: warn,\\n crit: crit,\\n)\"}\n",
},
},
}

View File

@ -2,10 +2,14 @@ package check_test
import (
"encoding/json"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"testing"
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/stretchr/testify/require"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux/parser"
@ -282,3 +286,12 @@ func TestJSON(t *testing.T) {
t.Run(c.name, fn)
}
}
func mustFormatPackage(t *testing.T, pkg *ast.Package) string {
if len(pkg.Files) == 0 {
t.Fatal("package expected to have at least one file")
}
v, err := astutil.Format(pkg.Files[0])
require.NoError(t, err)
return v
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
@ -98,7 +99,7 @@ func (c Custom) sanitizeFlux(lang fluxlang.FluxLanguageService) (string, error)
}
})
return ast.Format(p), nil
return astutil.Format(p.Files[0])
}
func propertyHasValue(prop *ast.Property, key string, value string) bool {

View File

@ -6,7 +6,6 @@ import (
"testing"
"github.com/andreyvit/diff"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/parser"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification/check"
@ -28,10 +27,8 @@ import "influxdata/influxdb/v1"
data = from(bucket: "_tasks")
|> range(start: -1m)
|> filter(fn: (r) =>
(r._measurement == "runs"))
|> filter(fn: (r) =>
(r._field == "finishedAt"))
|> filter(fn: (r) => r._measurement == "runs")
|> filter(fn: (r) => r._field == "finishedAt")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1m, offset: 0s}
@ -110,13 +107,13 @@ data
ID: 10,
Name: "moo",
Query: influxdb.DashboardQuery{
Text: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
},
},
},
wants: wants{
err: nil,
script: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
script: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
},
},
{
@ -126,13 +123,13 @@ data
ID: 10,
Name: "moo",
Query: influxdb.DashboardQuery{
Text: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000b"))),
Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000b"))),
},
},
},
wants: wants{
err: nil,
script: ast.Format(parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
script: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(validQuery, "000000000000000a"))),
},
},
{
@ -157,7 +154,7 @@ data
ID: 10,
Name: "moo",
Query: influxdb.DashboardQuery{
Text: ast.Format(parser.ParseSource(fmt.Sprintf(invalidTaskQuery, "000000000000000b"))),
Text: mustFormatPackage(t, parser.ParseSource(fmt.Sprintf(invalidTaskQuery, "000000000000000b"))),
},
},
},

View File

@ -6,6 +6,7 @@ import (
"strings"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/flux"
@ -33,18 +34,18 @@ func (c Deadman) Type() string {
// GenerateFlux returns a flux script for the Deadman provided.
func (c Deadman) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) {
p, err := c.GenerateFluxAST(lang)
f, err := c.GenerateFluxAST(lang)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(f)
}
// GenerateFluxAST returns a flux AST for the deadman provided. If there
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) {
func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.File, error) {
p, err := query.Parse(lang, c.Query.Text)
if p == nil {
return nil, err
@ -69,7 +70,7 @@ func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Packag
f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "experimental", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, c.generateFluxASTBody()...)
return p, nil
return f, nil
}
func (c Deadman) generateFluxASTBody() []ast.Statement {

View File

@ -7,6 +7,8 @@ import (
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/check"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDeadman_GenerateFlux(t *testing.T) {
@ -58,31 +60,20 @@ func TestDeadman_GenerateFlux(t *testing.T) {
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "experimental"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -10m)
data = from(bucket: "foo") |> range(start: -10m)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "deadman",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
info = (r) =>
(r["dead"])
messageFn = (r) =>
("whoa! {r[\"dead\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}}
info = (r) => r["dead"]
messageFn = (r) => "whoa! {r[\"dead\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
},
},
{
@ -121,31 +112,20 @@ data
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "experimental"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -10m)
data = from(bucket: "foo") |> range(start: -10m)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "deadman",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
info = (r) =>
(r["dead"])
messageFn = (r) =>
("whoa! {r[\"dead\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}}
info = (r) => r["dead"]
messageFn = (r) => "whoa! {r[\"dead\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
},
},
{
@ -184,33 +164,20 @@ data
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "experimental"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -10m)
|> filter(fn: (r) =>
(r._field == "usage user"))
data = from(bucket: "foo") |> range(start: -10m) |> filter(fn: (r) => r._field == "usage user")
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "deadman",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
info = (r) =>
(r["dead"])
messageFn = (r) =>
("whoa! {r[\"dead\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "deadman", tags: {aaa: "vaaa", bbb: "vbbb"}}
info = (r) => r["dead"]
messageFn = (r) => "whoa! {r[\"dead\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
data |> v1["fieldsAsCols"]() |> monitor["deadman"](t: experimental["subDuration"](from: now(), d: 60s))
|> monitor["check"](data: check, messageFn: messageFn, info: info)`,
},
},
}
@ -218,13 +185,8 @@ data
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := tt.args.deadman.GenerateFlux(fluxlang.DefaultService)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if exp, got := tt.wants.script, s; exp != got {
t.Errorf("expected:\n%v\n\ngot:\n%v\n", exp, got)
}
require.NoError(t, err)
assert.Equal(t, tt.wants.script, s)
})
}

View File

@ -6,6 +6,7 @@ import (
"strings"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/notification"
@ -107,18 +108,18 @@ func multiError(errs []error) error {
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) {
p, err := t.GenerateFluxAST(lang)
f, err := t.GenerateFluxAST(lang)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(f)
}
// GenerateFluxAST returns a flux AST for the threshold provided. If there
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) {
func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.File, error) {
p, err := query.Parse(lang, t.Query.Text)
if p == nil {
return nil, err
@ -148,7 +149,7 @@ func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Pack
f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody(fields[0])...)
return p, nil
return f, nil
}
// TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now.

View File

@ -3,12 +3,12 @@ package check_test
import (
"testing"
"github.com/influxdata/flux/ast"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/check"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestThreshold_GenerateFlux(t *testing.T) {
@ -77,45 +77,29 @@ func TestThreshold_GenerateFlux(t *testing.T) {
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> filter(fn: (r) =>
(r._field == "usage_user"))
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "threshold",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
ok = (r) =>
(r["usage_user"] > 10.0)
info = (r) =>
(r["usage_user"] < 40.0)
warn = (r) =>
(r["usage_user"] < 40.0 and r["usage_user"] > 10.0)
crit = (r) =>
(r["usage_user"] < 10.0 or r["usage_user"] > 40.0)
messageFn = (r) =>
("whoa! {r[\"usage_user\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}}
ok = (r) => r["usage_user"] > 10.0
info = (r) => r["usage_user"] < 40.0
warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0
crit = (r) => r["usage_user"] < 10.0 or r["usage_user"] > 40.0
messageFn = (r) => "whoa! {r[\"usage_user\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
data |> v1["fieldsAsCols"]() |> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
},
},
{
@ -168,45 +152,29 @@ data
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> filter(fn: (r) =>
(r._field == "usage_user"))
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "threshold",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
ok = (r) =>
(r["usage_user"] > 10.0)
info = (r) =>
(r["usage_user"] < 40.0)
warn = (r) =>
(r["usage_user"] < 40.0 and r["usage_user"] > 10.0)
crit = (r) =>
(r["usage_user"] < 10.0 or r["usage_user"] > 40.0)
messageFn = (r) =>
("whoa! {r[\"usage_user\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}}
ok = (r) => r["usage_user"] > 10.0
info = (r) => r["usage_user"] < 40.0
warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0
crit = (r) => r["usage_user"] < 10.0 or r["usage_user"] > 40.0
messageFn = (r) => "whoa! {r[\"usage_user\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
data |> v1["fieldsAsCols"]() |> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
},
},
{
@ -259,45 +227,29 @@ data
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> filter(fn: (r) =>
(r._field == "usage user"))
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "threshold",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
ok = (r) =>
(r["usage user"] > 10.0)
info = (r) =>
(r["usage user"] < 40.0)
warn = (r) =>
(r["usage user"] < 40.0 and r["usage user"] > 10.0)
crit = (r) =>
(r["usage user"] < 10.0 or r["usage user"] > 40.0)
messageFn = (r) =>
("whoa! {r[\"usage user\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}}
ok = (r) => r["usage user"] > 10.0
info = (r) => r["usage user"] < 40.0
warn = (r) => r["usage user"] < 40.0 and r["usage user"] > 10.0
crit = (r) => r["usage user"] < 10.0 or r["usage user"] > 40.0
messageFn = (r) => "whoa! {r[\"usage user\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
data |> v1["fieldsAsCols"]() |> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
},
},
{
@ -350,59 +302,38 @@ data
},
},
wants: wants{
script: `package main
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
data = from(bucket: "foo")
|> range(start: -1h)
|> filter(fn: (r) =>
(r._field == "usage_user"))
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
data = from(bucket: "foo") |> range(start: -1h) |> filter(fn: (r) => r._field == "usage_user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
option task = {name: "moo", every: 1h}
check = {
_check_id: "000000000000000a",
_check_name: "moo",
_type: "threshold",
tags: {aaa: "vaaa", bbb: "vbbb"},
}
ok = (r) =>
(r["usage_user"] > 10.0)
info = (r) =>
(r["usage_user"] < 40.0)
warn = (r) =>
(r["usage_user"] < 40.0 and r["usage_user"] > 10.0)
crit = (r) =>
(r["usage_user"] < 40.0 and r["usage_user"] > 10.0)
messageFn = (r) =>
("whoa! {r[\"usage_user\"]}")
check = {_check_id: "000000000000000a", _check_name: "moo", _type: "threshold", tags: {aaa: "vaaa", bbb: "vbbb"}}
ok = (r) => r["usage_user"] > 10.0
info = (r) => r["usage_user"] < 40.0
warn = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0
crit = (r) => r["usage_user"] < 40.0 and r["usage_user"] > 10.0
messageFn = (r) => "whoa! {r[\"usage_user\"]}"
data
|> v1["fieldsAsCols"]()
|> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
data |> v1["fieldsAsCols"]() |> monitor["check"](
data: check,
messageFn: messageFn,
ok: ok,
info: info,
warn: warn,
crit: crit,
)`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// TODO(desa): change this to GenerateFlux() when we don't need to code
// around the monitor package not being available.
p, err := tt.args.threshold.GenerateFluxAST(fluxlang.DefaultService)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
assert.Equal(t, tt.wants.script, ast.Format(p))
s, err := tt.args.threshold.GenerateFlux(fluxlang.DefaultService)
require.NoError(t, err)
assert.Equal(t, tt.wants.script, s)
})
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/flux"
@ -21,21 +22,12 @@ func (s *HTTP) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) {
if !ok {
return "", fmt.Errorf("endpoint provided is a %s, not an HTTP endpoint", e.Type())
}
p, err := s.GenerateFluxAST(httpEndpoint)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(s.GenerateFluxAST(httpEndpoint))
}
// GenerateFluxAST generates a flux AST for the http notification rule.
func (s *HTTP) GenerateFluxAST(e *endpoint.HTTP) (*ast.Package, error) {
f := flux.File(
s.Name,
s.imports(e),
s.generateFluxASTBody(e),
)
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
func (s *HTTP) GenerateFluxAST(e *endpoint.HTTP) *ast.File {
return flux.File(s.Name, s.imports(e), s.generateFluxASTBody(e))
}
func (s *HTTP) imports(e *endpoint.HTTP) []*ast.ImportDeclaration {

View File

@ -1,9 +1,12 @@
package rule_test
import (
"github.com/influxdata/influxdb/v2/kit/platform"
"testing"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/endpoint"
@ -11,9 +14,7 @@ import (
)
func TestHTTP_GenerateFlux(t *testing.T) {
want := `package main
// foo
import "influxdata/influxdb/monitor"
want := `import "influxdata/influxdb/monitor"
import "http"
import "json"
import "experimental"
@ -22,26 +23,16 @@ option task = {name: "foo", every: 1h, offset: 1s}
headers = {"Content-Type": "application/json"}
endpoint = http["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h)
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
return {headers: headers, data: json["encode"](v: body)}
}))`
return {headers: headers, data: json["encode"](v: body)}
}))`
s := &rule.HTTP{
Base: rule.Base{
@ -79,9 +70,7 @@ all_statuses
}
func TestHTTP_GenerateFlux_basicAuth(t *testing.T) {
want := `package main
// foo
import "influxdata/influxdb/monitor"
want := `import "influxdata/influxdb/monitor"
import "http"
import "json"
import "experimental"
@ -91,26 +80,16 @@ option task = {name: "foo", every: 1h, offset: 1s}
headers = {"Content-Type": "application/json", "Authorization": http["basicAuth"](u: secrets["get"](key: "000000000000000e-username"), p: secrets["get"](key: "000000000000000e-password"))}
endpoint = http["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h)
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
return {headers: headers, data: json["encode"](v: body)}
}))`
return {headers: headers, data: json["encode"](v: body)}
}))`
s := &rule.HTTP{
Base: rule.Base{
ID: 1,
@ -154,9 +133,7 @@ all_statuses
}
func TestHTTP_GenerateFlux_bearer(t *testing.T) {
want := `package main
// foo
import "influxdata/influxdb/monitor"
want := `import "influxdata/influxdb/monitor"
import "http"
import "json"
import "experimental"
@ -166,26 +143,16 @@ option task = {name: "foo", every: 1h, offset: 1s}
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + secrets["get"](key: "000000000000000e-token")}
endpoint = http["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h)
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
return {headers: headers, data: json["encode"](v: body)}
}))`
return {headers: headers, data: json["encode"](v: body)}
}))`
s := &rule.HTTP{
Base: rule.Base{
@ -227,9 +194,7 @@ all_statuses
}
func TestHTTP_GenerateFlux_bearer_every_second(t *testing.T) {
want := `package main
// foo
import "influxdata/influxdb/monitor"
want := `import "influxdata/influxdb/monitor"
import "http"
import "json"
import "experimental"
@ -239,26 +204,16 @@ option task = {name: "foo", every: 5s, offset: 1s}
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + secrets["get"](key: "000000000000000e-token")}
endpoint = http["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -10s)
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 5s)))
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 5s))
all_statuses
|> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
all_statuses |> monitor["notify"](data: notification, endpoint: endpoint(mapFn: (r) => {
body = {r with _version: 1}
return {headers: headers, data: json["encode"](v: body)}
}))`
return {headers: headers, data: json["encode"](v: body)}
}))`
s := &rule.HTTP{
Base: rule.Base{
@ -290,11 +245,6 @@ all_statuses
}
f, err := s.GenerateFlux(e)
if err != nil {
t.Fatal(err)
}
if f != want {
t.Errorf("scripts did not match. want:\n%v\n\ngot:\n%v", want, f)
}
require.NoError(t, err)
assert.Equal(t, want, f)
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/flux/ast"
@ -57,21 +58,16 @@ func (s *PagerDuty) GenerateFlux(e influxdb.NotificationEndpoint) (string, error
if !ok {
return "", fmt.Errorf("endpoint provided is a %s, not an PagerDuty endpoint", e.Type())
}
p, err := s.GenerateFluxAST(pagerdutyEndpoint)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(s.GenerateFluxAST(pagerdutyEndpoint))
}
// GenerateFluxAST generates a flux AST for the pagerduty notification rule.
func (s *PagerDuty) GenerateFluxAST(e *endpoint.PagerDuty) (*ast.Package, error) {
f := flux.File(
func (s *PagerDuty) GenerateFluxAST(e *endpoint.PagerDuty) *ast.File {
return flux.File(
s.Name,
flux.Imports("influxdata/influxdb/monitor", "pagerduty", "influxdata/influxdb/secrets", "experimental"),
s.generateFluxASTBody(e),
)
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
}
func (s *PagerDuty) generateFluxASTBody(e *endpoint.PagerDuty) []ast.Statement {

View File

@ -59,9 +59,7 @@ func TestPagerDuty_GenerateFlux(t *testing.T) {
},
},
},
script: `package main
// foo
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "pagerduty"
import "influxdata/influxdb/secrets"
import "experimental"
@ -70,35 +68,23 @@ option task = {name: "foo", every: 1h}
pagerduty_secret = secrets["get"](key: "pagerduty_token")
pagerduty_endpoint = pagerduty["endpoint"]()
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) =>
({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
},
{
name: "notify on info to crit",
@ -143,9 +129,7 @@ all_statuses
},
},
},
script: `package main
// foo
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "pagerduty"
import "influxdata/influxdb/secrets"
import "experimental"
@ -154,34 +138,23 @@ option task = {name: "foo", every: 1h}
pagerduty_secret = secrets["get"](key: "pagerduty_token")
pagerduty_endpoint = pagerduty["endpoint"]()
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
info_to_crit = statuses
|> monitor["stateChanges"](fromLevel: "info", toLevel: "crit")
all_statuses = info_to_crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
info_to_crit = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "crit")
all_statuses = info_to_crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) =>
({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
},
{
name: "notify on crit or ok to warn",
@ -229,9 +202,7 @@ all_statuses
},
},
},
script: `package main
// foo
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "pagerduty"
import "influxdata/influxdb/secrets"
import "experimental"
@ -240,38 +211,24 @@ option task = {name: "foo", every: 1h}
pagerduty_secret = secrets["get"](key: "pagerduty_token")
pagerduty_endpoint = pagerduty["endpoint"]()
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
ok_to_warn = statuses
|> monitor["stateChanges"](fromLevel: "ok", toLevel: "warn")
all_statuses = union(tables: [crit, ok_to_warn])
|> sort(columns: ["_time"])
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
ok_to_warn = statuses |> monitor["stateChanges"](fromLevel: "ok", toLevel: "warn")
all_statuses = union(tables: [crit, ok_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) =>
({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: pagerduty_endpoint(mapFn: (r) => ({
routingKey: pagerduty_secret,
client: "influxdata",
clientURL: "http://localhost:7777/host/${r.host}",
class: r._check_name,
group: r["_source_measurement"],
severity: pagerduty["severityFromLevel"](level: r["_level"]),
eventAction: pagerduty["actionFromLevel"](level: r["_level"]),
source: notification["_notification_rule_name"],
summary: r["_message"],
timestamp: time(v: r["_source_timestamp"]),
})))`,
},
}

View File

@ -11,7 +11,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux/ast"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/mock"
@ -295,7 +295,7 @@ func CreateNotificationRule(
OwnerID: MustIDBase16("020f755c3c082005"),
Name: "name2",
Status: "active",
Flux: "package main\n// name2\nimport \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {\n\t_notification_rule_id: \"020f755c3c082001\",\n\t_notification_rule_name: \"name2\",\n\t_notification_endpoint_id: \"020f755c3c082001\",\n\t_notification_endpoint_name: \"foo\",\n}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) =>\n\t(r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\"))\ncrit = statuses\n\t|> filter(fn: (r) =>\n\t\t(r[\"_level\"] == \"crit\"))\nall_statuses = crit\n\t|> filter(fn: (r) =>\n\t\t(r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h)))\n\nall_statuses\n\t|> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>\n\t\t({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))",
Flux: "import \"influxdata/influxdb/monitor\"\nimport \"slack\"\nimport \"influxdata/influxdb/secrets\"\nimport \"experimental\"\n\noption task = {name: \"name2\", every: 1h}\n\nslack_secret = secrets[\"get\"](key: \"020f755c3c082001-token\")\nslack_endpoint = slack[\"endpoint\"](token: slack_secret, url: \"http://localhost:7777\")\nnotification = {_notification_rule_id: \"020f755c3c082001\", _notification_rule_name: \"name2\", _notification_endpoint_id: \"020f755c3c082001\", _notification_endpoint_name: \"foo\"}\nstatuses = monitor[\"from\"](start: -2h, fn: (r) => r[\"k1\"] == \"v1\" and r[\"k2\"] == \"v2\")\ncrit = statuses |> filter(fn: (r) => r[\"_level\"] == \"crit\")\nall_statuses = crit |> filter(fn: (r) => r[\"_time\"] >= experimental[\"subDuration\"](from: now(), d: 1h))\n\nall_statuses |> monitor[\"notify\"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: \"\", text: \"msg1\", color: if r[\"_level\"] == \"crit\" then \"danger\" else if r[\"_level\"] == \"warn\" then \"warning\" else \"good\"})))",
Every: "1h",
},
},

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/flux/ast"
@ -25,21 +26,16 @@ func (s *Slack) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) {
if !ok {
return "", fmt.Errorf("endpoint provided is a %s, not an Slack endpoint", e.Type())
}
p, err := s.GenerateFluxAST(slackEndpoint)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(s.GenerateFluxAST(slackEndpoint))
}
// GenerateFluxAST generates a flux AST for the slack notification rule.
func (s *Slack) GenerateFluxAST(e *endpoint.Slack) (*ast.Package, error) {
f := flux.File(
func (s *Slack) GenerateFluxAST(e *endpoint.Slack) *ast.File {
return flux.File(
s.Name,
flux.Imports("influxdata/influxdb/monitor", "slack", "influxdata/influxdb/secrets", "experimental"),
s.generateFluxASTBody(e),
)
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
}
func (s *Slack) generateFluxASTBody(e *endpoint.Slack) []ast.Statement {

View File

@ -1,9 +1,10 @@
package rule_test
import (
"github.com/influxdata/influxdb/v2/kit/platform"
"testing"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/parser"
"github.com/influxdata/influxdb/v2"
@ -39,9 +40,7 @@ func TestSlack_GenerateFlux(t *testing.T) {
}{
{
name: "with any status",
want: `package main
// foo
import "influxdata/influxdb/monitor"
want: `import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
@ -49,24 +48,12 @@ import "experimental"
option task = {name: "foo", every: 1h}
slack_endpoint = slack["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
any = statuses
|> filter(fn: (r) =>
(true))
all_statuses = any
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
any = statuses |> filter(fn: (r) => true)
all_statuses = any |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
rule: &rule.Slack{
Channel: "bar",
MessageTemplate: "blah",
@ -108,9 +95,7 @@ all_statuses
},
{
name: "with url",
want: `package main
// foo
import "influxdata/influxdb/monitor"
want: `import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
@ -118,27 +103,13 @@ import "experimental"
option task = {name: "foo", every: 1h}
slack_endpoint = slack["endpoint"](url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
info_to_warn = statuses
|> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn])
|> sort(columns: ["_time"])
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
rule: &rule.Slack{
Channel: "bar",
MessageTemplate: "blah",
@ -184,9 +155,7 @@ all_statuses
},
{
name: "with token",
want: `package main
// foo
import "influxdata/influxdb/monitor"
want: `import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
@ -195,27 +164,13 @@ option task = {name: "foo", every: 1h}
slack_secret = secrets["get"](key: "slack_token")
slack_endpoint = slack["endpoint"](token: slack_secret)
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
info_to_warn = statuses
|> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn])
|> sort(columns: ["_time"])
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
rule: &rule.Slack{
Channel: "bar",
MessageTemplate: "blah",
@ -263,9 +218,7 @@ all_statuses
},
{
name: "with token and url",
want: `package main
// foo
import "influxdata/influxdb/monitor"
want: `import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
@ -274,27 +227,13 @@ option task = {name: "foo", every: 1h}
slack_secret = secrets["get"](key: "slack_token")
slack_endpoint = slack["endpoint"](token: slack_secret, url: "http://localhost:7777")
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000002",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
info_to_warn = statuses
|> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn])
|> sort(columns: ["_time"])
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000002", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
info_to_warn = statuses |> monitor["stateChanges"](fromLevel: "info", toLevel: "warn")
all_statuses = union(tables: [crit, info_to_warn]) |> sort(columns: ["_time"]) |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "bar", text: "blah", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))`,
rule: &rule.Slack{
Channel: "bar",
MessageTemplate: "blah",

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/flux/ast"
@ -26,21 +27,16 @@ func (s *Telegram) GenerateFlux(e influxdb.NotificationEndpoint) (string, error)
if !ok {
return "", fmt.Errorf("endpoint provided is a %s, not a Telegram endpoint", e.Type())
}
p, err := s.GenerateFluxAST(telegramEndpoint)
if err != nil {
return "", err
}
return ast.Format(p), nil
return astutil.Format(s.GenerateFluxAST(telegramEndpoint))
}
// GenerateFluxAST generates a flux AST for the telegram notification rule.
func (s *Telegram) GenerateFluxAST(e *endpoint.Telegram) (*ast.Package, error) {
f := flux.File(
func (s *Telegram) GenerateFluxAST(e *endpoint.Telegram) *ast.File {
return flux.File(
s.Name,
flux.Imports("influxdata/influxdb/monitor", "contrib/sranka/telegram", "influxdata/influxdb/secrets", "experimental"),
s.generateFluxASTBody(e),
)
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
}
func (s *Telegram) generateFluxASTBody(e *endpoint.Telegram) []ast.Statement {

View File

@ -1,9 +1,10 @@
package rule_test
import (
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"testing"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/andreyvit/diff"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification"
@ -102,9 +103,7 @@ func TestTelegram_GenerateFlux(t *testing.T) {
},
},
},
script: `package main
// foo
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "contrib/sranka/telegram"
import "influxdata/influxdb/secrets"
import "experimental"
@ -113,24 +112,12 @@ option task = {name: "foo", every: 1h}
telegram_secret = secrets["get"](key: "3-key")
telegram_endpoint = telegram["endpoint"](token: telegram_secret, disableWebPagePreview: false)
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000003",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
crit = statuses
|> filter(fn: (r) =>
(r["_level"] == "crit"))
all_statuses = crit
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000003", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) =>
({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`,
},
{
name: "with DisableWebPagePreview and ParseMode",
@ -174,9 +161,7 @@ all_statuses
},
},
},
script: `package main
// foo
import "influxdata/influxdb/monitor"
script: `import "influxdata/influxdb/monitor"
import "contrib/sranka/telegram"
import "influxdata/influxdb/secrets"
import "experimental"
@ -185,24 +170,12 @@ option task = {name: "foo", every: 1h}
telegram_secret = secrets["get"](key: "3-key")
telegram_endpoint = telegram["endpoint"](token: telegram_secret, parseMode: "HTML", disableWebPagePreview: true)
notification = {
_notification_rule_id: "0000000000000001",
_notification_rule_name: "foo",
_notification_endpoint_id: "0000000000000003",
_notification_endpoint_name: "foo",
}
statuses = monitor["from"](start: -2h, fn: (r) =>
(r["foo"] == "bar" and r["baz"] == "bang"))
any = statuses
|> filter(fn: (r) =>
(true))
all_statuses = any
|> filter(fn: (r) =>
(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))
notification = {_notification_rule_id: "0000000000000001", _notification_rule_name: "foo", _notification_endpoint_id: "0000000000000003", _notification_endpoint_name: "foo"}
statuses = monitor["from"](start: -2h, fn: (r) => r["foo"] == "bar" and r["baz"] == "bang")
any = statuses |> filter(fn: (r) => true)
all_statuses = any |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))
all_statuses
|> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) =>
({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`,
all_statuses |> monitor["notify"](data: notification, endpoint: telegram_endpoint(mapFn: (r) => ({channel: "-12345", text: "blah", silent: if r["_level"] == "crit" then true else if r["_level"] == "warn" then true else false})))`,
},
}

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/flux/parser"
"github.com/influxdata/influxdb/v2"
@ -1304,7 +1305,11 @@ func (q query) DashboardQuery() string {
edit.SetOption(files[0], "task", tobj)
}
}
return ast.Format(files[0])
// TODO(danmoran): I'm not happy about ignoring this error, but pkger doesn't have adequate error return values
// in the callstack. In most cases errors are simply ignored and the desired output of the operation is skipped.
// If I were to change the contract here, a lot of other things would need to be changed.
s, _ := astutil.Format(files[0])
return s
}
type queries []query

View File

@ -2599,24 +2599,21 @@ spec:
// parmas
queryText := `option params = {
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
}
from(bucket: params.bucket)
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) =>
(r._measurement == "processes"))
|> filter(fn: (r) =>
(r.floater == params.floatVal))
|> filter(fn: (r) =>
(r._value > params.minVal))
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) => r._measurement == "processes")
|> filter(fn: (r) => r.floater == params.floatVal)
|> filter(fn: (r) => r._value > params.minVal)
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
q := props.Queries[0]
assert.Equal(t, queryText, q.Text)
@ -3599,24 +3596,21 @@ spec:
assert.Equal(t, "task-uuid", actual.MetaName)
queryText := `option params = {
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
bucket: "bar",
start: -24h0m0s,
stop: now(),
name: "max",
floatVal: 37.2,
minVal: 10,
}
from(bucket: params.bucket)
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) =>
(r._measurement == "processes"))
|> filter(fn: (r) =>
(r.floater == params.floatVal))
|> filter(fn: (r) =>
(r._value > params.minVal))
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
|> range(start: params.start, stop: params.stop)
|> filter(fn: (r) => r._measurement == "processes")
|> filter(fn: (r) => r.floater == params.floatVal)
|> filter(fn: (r) => r._value > params.minVal)
|> aggregateWindow(every: v.windowPeriod, fn: max)
|> yield(name: params.name)`
assert.Equal(t, queryText, actual.Query)
@ -3732,13 +3726,11 @@ from(bucket: params.bucket)
queryText := `option task = {name: "foo", every: 1m0s, offset: 1m0s}
from(bucket: "rucket_1")
|> range(start: -5d, stop: -1h)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_idle"))
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")`
|> range(start: -5d, stop: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")`
assert.Equal(t, queryText, actual[0].Query)
@ -3763,13 +3755,11 @@ from(bucket: "rucket_1")
queryText := `option params = {this: "foo"}
from(bucket: "rucket_1")
|> range(start: -5d, stop: -1h)
|> filter(fn: (r) =>
(r._measurement == params.this))
|> filter(fn: (r) =>
(r._field == "usage_idle"))
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")`
|> range(start: -5d, stop: -1h)
|> filter(fn: (r) => r._measurement == params.this)
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")`
assert.Equal(t, queryText, actual[0].Query)

View File

@ -32,9 +32,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + `
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> ` + name + `()

View File

@ -8,9 +8,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE host = 'server01'`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> filter(fn: (r) => r["host"] == "server01")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])

View File

@ -8,9 +8,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu GROUP BY host`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field", "host"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "host", "_time", "_value"])
|> ` + name + `()

View File

@ -8,9 +8,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE time >= now() - 10m GROUP BY time(1m)`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> window(every: 1m)

View File

@ -8,9 +8,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu WHERE time >= now() - 10m GROUP BY time(5m, 12m)`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> window(every: 5m, start: 1970-01-01T00:02:00Z)

View File

@ -6,18 +6,14 @@ func init() {
`SELECT mean(value) FROM db0..cpu; SELECT max(value) FROM db0..cpu`,
`package main
from(bucketID: "")
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> mean()
|> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z}))
|> rename(columns: {_value: "mean"})
|> yield(name: "0")
from(bucketID: "")
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> max()

View File

@ -6,9 +6,7 @@ func init() {
`SELECT value FROM db0..cpu`,
`package main
from(bucketID: "")
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> rename(columns: {_value: "value"})

View File

@ -6,9 +6,7 @@ func init() {
`SELECT value FROM db0..cpu WHERE host = 'server01'`,
`package main
from(bucketID: "")
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> filter(fn: (r) => r["host"] == "server01")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])

View File

@ -6,9 +6,7 @@ func init() {
`SELECT value FROM db0..cpu WHERE host =~ /.*er01/`,
`package main
from(bucketID: "")
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> filter(fn: (r) => r["host"] =~ /.*er01/)
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])

View File

@ -8,9 +8,7 @@ func init() {
`SELECT value FROM db0.alternate.cpu`,
`package main
`+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+`
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
`+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> rename(columns: {_value: "value"})

View File

@ -33,9 +33,7 @@ func init() {
return fmt.Sprintf(`SELECT %s(value) FROM db0..cpu`, name),
`package main
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + `
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|> ` + name + `()

View File

@ -8,9 +8,7 @@ func init() {
import v1 "influxdata/influxdb/v1"
v1.databases()
|> rename(columns: {databaseName: "name"})
|> keep(columns: ["name"])
v1.databases() |> rename(columns: {databaseName: "name"}) |> keep(columns: ["name"])
|> yield(name: "0")
`,
),

View File

@ -8,9 +8,7 @@ func init() {
import v1 "influxdata/influxdb/v1"
v1.databases()
|> filter(fn: (r) => r.databaseName == "telegraf")
|> rename(columns: {retentionPolicy: "name", retentionPeriod: "duration"})
v1.databases() |> filter(fn: (r) => r.databaseName == "telegraf") |> rename(columns: {retentionPolicy: "name", retentionPeriod: "duration"})
|> set(key: "shardGroupDuration", value: "0")
|> set(key: "replicaN", value: "2")
|> keep(columns: ["name", "duration", "shardGroupDuration", "replicaN", "default"])

View File

@ -6,9 +6,7 @@ func init() {
`SHOW TAG VALUES ON "db0" WITH KEY = "host"`,
`package main
from(bucketID: "")
|> range(start: -1h)
|> keyValues(keyColumns: ["host"])
from(bucketID: "") |> range(start: -1h) |> keyValues(keyColumns: ["host"])
|> group(columns: ["_measurement", "_key"], mode: "by")
|> distinct()
|> group(columns: ["_measurement"], mode: "by")

View File

@ -6,9 +6,7 @@ func init() {
`SHOW TAG VALUES ON "db0" WITH KEY IN ("host", "region")`,
`package main
from(bucketID: "")
|> range(start: -1h)
|> keyValues(keyColumns: ["host", "region"])
from(bucketID: "") |> range(start: -1h) |> keyValues(keyColumns: ["host", "region"])
|> group(columns: ["_measurement", "_key"], mode: "by")
|> distinct()
|> group(columns: ["_measurement"], mode: "by")

View File

@ -6,9 +6,7 @@ func init() {
`SHOW TAG VALUES ON "db0" FROM "cpu", "mem", "gpu" WITH KEY = "host"`,
`package main
from(bucketID: "")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" or (r._measurement == "mem" or r._measurement == "gpu"))
from(bucketID: "") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu" or (r._measurement == "mem" or r._measurement == "gpu"))
|> keyValues(keyColumns: ["host"])
|> group(columns: ["_measurement", "_key"], mode: "by")
|> distinct()

View File

@ -7,7 +7,9 @@ import (
"testing"
"time"
"github.com/influxdata/flux/ast/astutil"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/stretchr/testify/require"
"github.com/andreyvit/diff"
"github.com/influxdata/flux/ast"
@ -84,7 +86,8 @@ func (f *fixture) Run(t *testing.T) {
err := ast.GetError(wantAST)
t.Fatalf("found parser errors in the want text: %s", err.Error())
}
want := ast.Format(wantAST)
want, err := astutil.Format(wantAST.Files[0])
require.NoError(t, err)
transpiler := influxql.NewTranspilerWithConfig(
dbrpMappingSvc,
@ -98,7 +101,8 @@ func (f *fixture) Run(t *testing.T) {
if err != nil {
t.Fatalf("%s:%d: unexpected error: %s", f.file, f.line, err)
}
got := ast.Format(pkg)
got, err := astutil.Format(pkg.Files[0])
require.NoError(t, err)
// Encode both of these to JSON and compare the results.
if want != got {

View File

@ -116,7 +116,7 @@ func TestDeduplicateRuns(t *testing.T) {
},
FindRunsFn: func(context.Context, taskmodel.RunFilter) ([]*taskmodel.Run, int, error) {
return []*taskmodel.Run{
&taskmodel.Run{ID: 2, Status: "started"},
{ID: 2, Status: "started"},
}, 1, nil
},
}

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
@ -46,7 +47,18 @@ type Duration struct {
}
func (a Duration) String() string {
return ast.Format(&a.Node)
// NOTE: This is a copy of `formatDurationLiteral` from the flux codebase.
// We copy it here so we can break the dependency on the Go formatter in this method without a change in behavior.
// The Rust-based formatter doesn't expose an interface for formatting individual nodes.
builder := strings.Builder{}
formatDuration := func(d ast.Duration) {
builder.WriteString(strconv.FormatInt(d.Magnitude, 10))
builder.WriteString(d.Unit)
}
for _, d := range a.Node.Values {
formatDuration(d)
}
return builder.String()
}
// Parse parses a string into a Duration.

View File

@ -415,7 +415,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: just update an option.
newStatus = string(taskmodel.TaskActive)
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tcron: \"* * * * *\",\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
newFlux = "option task = {name: \"task-changed #98\", cron: \"* * * * *\", offset: 5s, concurrency: 100}\n\n// This comment should persist.\nfrom(bucket: \"b\")\n |> to(bucket: \"two\", orgID: \"000000000000000\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, taskmodel.TaskUpdate{Options: options.Options{Name: "task-changed #98"}})
if err != nil {
t.Fatal(err)
@ -430,7 +430,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: switch to every.
newStatus = string(taskmodel.TaskActive)
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
newFlux = "option task = {name: \"task-changed #98\", every: 30s, offset: 5s, concurrency: 100}\n\n// This comment should persist.\nfrom(bucket: \"b\")\n |> to(bucket: \"two\", orgID: \"000000000000000\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, taskmodel.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}})
if err != nil {
t.Fatal(err)
@ -497,15 +497,10 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
func testTaskFindTasksPaging(t *testing.T, sys *System) {
script := `option task = {
name: "Task %03d",
cron: "* * * * *",
concurrency: 100,
offset: 10s,
}
script := `option task = {name: "Task %03d", cron: "* * * * *", concurrency: 100, offset: 10s}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
cr := creds(t, sys)
@ -554,15 +549,10 @@ from(bucket: "b")
func testTaskFindTasksAfterPaging(t *testing.T, sys *System) {
var (
script = `option task = {
name: "some-unique-task-name",
cron: "* * * * *",
concurrency: 100,
offset: 10s,
}
script = `option task = {name: "some-unique-task-name", cron: "* * * * *", concurrency: 100, offset: 10s}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
cr = creds(t, sys)
tc = taskmodel.TaskCreate{
OrganizationID: cr.OrgID,
@ -636,15 +626,10 @@ from(bucket: "b")
//Retrieve the task again to ensure the options are now Every, without Cron or Offset
func testTaskOptionsUpdateFull(t *testing.T, sys *System) {
script := `option task = {
name: "task-Options-Update",
cron: "* * * * *",
concurrency: 100,
offset: 10s,
}
script := `option task = {name: "task-Options-Update", cron: "* * * * *", concurrency: 100, offset: 10s}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
cr := creds(t, sys)
@ -662,7 +647,7 @@ from(bucket: "b")
expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Options: options.Options{Offset: &options.Duration{}, Every: *(options.MustParseDuration("10s"))}})
if err != nil {
t.Fatal(err)
@ -677,15 +662,10 @@ from(bucket: "b")
}
})
t.Run("update task with different offset option", func(t *testing.T) {
expectedFlux := `option task = {
name: "task-Options-Update",
every: 10s,
concurrency: 100,
offset: 10s,
}
expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100, offset: 10s}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Options: options.Options{Offset: options.MustParseDuration("10s")}})
if err != nil {
t.Fatal(err)
@ -699,14 +679,10 @@ from(bucket: "b")
t.Fatalf("flux unexpected updated: %s", diff)
}
withoutOffset := `option task = {
name: "task-Options-Update",
every: 10s,
concurrency: 100,
}
withoutOffset := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
fNoOffset, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, taskmodel.TaskUpdate{Flux: &withoutOffset})
if err != nil {
t.Fatal(err)
@ -1804,25 +1780,17 @@ func creds(t *testing.T, s *System) TestCreds {
}
const (
scriptFmt = `option task = {
name: "task #%d",
cron: "* * * * *",
offset: 5s,
concurrency: 100,
}
scriptFmt = `option task = {name: "task #%d", cron: "* * * * *", offset: 5s, concurrency: 100}
// This comment should persist.
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
scriptDifferentName = `option task = {
name: "task-changed #%d",
cron: "* * * * *",
offset: 5s,
concurrency: 100,
}
scriptDifferentName = `option task = {name: "task-changed #%d", cron: "* * * * *", offset: 5s, concurrency: 100}
// This comment should persist.
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
|> to(bucket: "two", orgID: "000000000000000")`
)
func testTaskType(t *testing.T, sys *System) {

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/astutil"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/influxdb/v2/kit/platform"
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
@ -401,7 +402,10 @@ func (t *TaskUpdate) updateFlux(parser fluxlang.FluxLanguageService, oldFlux str
}
t.Options.Clear()
s := ast.Format(parsed)
s, err := astutil.Format(parsed)
if err != nil {
return err
}
t.Flux = &s
}
return nil

View File

@ -67,8 +67,7 @@ func TestOptionsEditWithAST(t *testing.T) {
t.Run("fmt string", func(t *testing.T) {
expected := `option task = {every: 10s, name: "foo"}
from(bucket: "x")
|> range(start: -1h)`
from(bucket: "x") |> range(start: -1h)`
if *tu.Flux != expected {
t.Errorf("got the wrong task back, expected %s,\n got %s\n diff: %s", expected, *tu.Flux, cmp.Diff(expected, *tu.Flux))
}
@ -136,8 +135,7 @@ from(bucket: "x")
tu.Options.Offset = &options.Duration{}
expscript := `option task = {cron: "* * * * *", name: "foo"}
from(bucket: "x")
|> range(start: -1h)`
from(bucket: "x") |> range(start: -1h)`
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil {
t.Fatal(err)
}