Merge pull request #1692 from influxdata/flux-staging

chore: update Flux dependency
pull/10616/head
Nathaniel Cook 2018-12-04 13:07:03 -07:00 committed by GitHub
commit 3e8189a728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 90 additions and 49 deletions

2
go.mod
View File

@ -77,7 +77,7 @@ require (
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/flux v0.7.1
github.com/influxdata/flux v0.7.2
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect

4
go.sum
View File

@ -213,8 +213,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.7.1 h1:eEQ/AfF8ToqXpNAg2ktS+YfRmOA+ilEMywbH8bqmmKM=
github.com/influxdata/flux v0.7.1/go.mod h1:MIjvKpiQLRad9/ilY4jYwpIpMAhiOycJfK9YctCUGUM=
github.com/influxdata/flux v0.7.2 h1:E8xcBU4asyTtMm0X8rNksfgSn3bjrJVHX+H4xFwjb3w=
github.com/influxdata/flux v0.7.2/go.mod h1:MIjvKpiQLRad9/ilY4jYwpIpMAhiOycJfK9YctCUGUM=
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=

View File

@ -299,9 +299,9 @@ func TestFluxHandler_postFluxPlan(t *testing.T) {
{
name: "get plan from()",
w: httptest.NewRecorder(),
r: httptest.NewRequest("POST", "/api/v2/query/plan", bytes.NewBufferString(`{"query": "from(bucket:\"telegraf\")|> range(start: -5000h)|> filter(fn: (r) => r._measurement == \"mem\" AND r._field == \"used_percent\")|> group(by:[\"host\"])|> mean()"}`)),
r: httptest.NewRequest("POST", "/api/v2/query/plan", bytes.NewBufferString(`{"query": "from(bucket:\"telegraf\")|> range(start: -5000h)|> filter(fn: (r) => r._measurement == \"mem\" AND r._field == \"used_percent\")|> group(columns:[\"host\"])|> mean()"}`)),
now: func() time.Time { return time.Unix(0, 0).UTC() },
want: `{"spec":{"operations":[{"kind":"from","id":"from0","spec":{"bucket":"telegraf"}},{"kind":"range","id":"range1","spec":{"start":"-5000h0m0s","stop":"now","timeColumn":"_time","startColumn":"_start","stopColumn":"_stop"}},{"kind":"filter","id":"filter2","spec":{"fn":{"type":"FunctionExpression","block":{"type":"FunctionBlock","parameters":{"type":"FunctionParameters","list":[{"type":"FunctionParameter","key":{"type":"Identifier","name":"r"}}],"pipe":null},"body":{"type":"LogicalExpression","operator":"and","left":{"type":"BinaryExpression","operator":"==","left":{"type":"MemberExpression","object":{"type":"IdentifierExpression","name":"r"},"property":"_measurement"},"right":{"type":"StringLiteral","value":"mem"}},"right":{"type":"BinaryExpression","operator":"==","left":{"type":"MemberExpression","object":{"type":"IdentifierExpression","name":"r"},"property":"_field"},"right":{"type":"StringLiteral","value":"used_percent"}}}}}}},{"kind":"group","id":"group3","spec":{"by":["host"],"except":null,"all":false,"none":false}},{"kind":"mean","id":"mean4","spec":{"columns":["_value"]}}],"edges":[{"parent":"from0","child":"range1"},{"parent":"range1","child":"filter2"},{"parent":"filter2","child":"group3"},{"parent":"group3","child":"mean4"}],"resources":{"priority":"high","concurrency_quota":0,"memory_bytes_quota":0},"now":"1970-01-01T00:00:00Z"},"logical":{"roots":[{"Spec":{"Name":"_result"}}],"resources":{"priority":"high","concurrency_quota":1,"memory_bytes_quota":9223372036854775807},"now":"1970-01-01T00:00:00Z"},"physical":{"roots":[{"Spec":{"Name":"_result"}}],"resources":{"priority":"high","concurrency_quota":1,"memory_bytes_quota":9223372036854775807},"now":"1970-01-01T00:00:00Z"}}
want: `{"spec":{"operations":[{"kind":"from","id":"from0","spec":{"bucket":"telegraf"}},{"kind":"range","id":"range1","spec":{"start":"-5000h0m0s","stop":"now","timeColumn":"_time","startColumn":"_start","stopColumn":"_stop"}},{"kind":"filter","id":"filter2","spec":{"fn":{"type":"FunctionExpression","block":{"type":"FunctionBlock","parameters":{"type":"FunctionParameters","list":[{"type":"FunctionParameter","key":{"type":"Identifier","name":"r"}}],"pipe":null},"body":{"type":"LogicalExpression","operator":"and","left":{"type":"BinaryExpression","operator":"==","left":{"type":"MemberExpression","object":{"type":"IdentifierExpression","name":"r"},"property":"_measurement"},"right":{"type":"StringLiteral","value":"mem"}},"right":{"type":"BinaryExpression","operator":"==","left":{"type":"MemberExpression","object":{"type":"IdentifierExpression","name":"r"},"property":"_field"},"right":{"type":"StringLiteral","value":"used_percent"}}}}}}},{"kind":"group","id":"group3","spec":{"mode":"by","columns":["host"]}},{"kind":"mean","id":"mean4","spec":{"columns":["_value"]}}],"edges":[{"parent":"from0","child":"range1"},{"parent":"range1","child":"filter2"},{"parent":"filter2","child":"group3"},{"parent":"group3","child":"mean4"}],"resources":{"priority":"high","concurrency_quota":0,"memory_bytes_quota":0},"now":"1970-01-01T00:00:00Z"},"logical":{"roots":[{"Spec":{"Name":"_result"}}],"resources":{"priority":"high","concurrency_quota":1,"memory_bytes_quota":9223372036854775807},"now":"1970-01-01T00:00:00Z"},"physical":{"roots":[{"Spec":{"Name":"_result"}}],"resources":{"priority":"high","concurrency_quota":1,"memory_bytes_quota":9223372036854775807},"now":"1970-01-01T00:00:00Z"}}
`,
status: http.StatusOK,
},
@ -370,7 +370,7 @@ func Test_postPlanRequest_Valid(t *testing.T) {
name: "request with query is valid",
fields: fields{
Query: `from(bucket:"telegraf")|> range(start: -5000h)|> filter(fn: (r) => r._measurement == "mem" AND r._field == "used_percent")|> group(by:["host"])|> mean()`,
Query: `from(bucket:"telegraf")|> range(start: -5000h)|> filter(fn: (r) => r._measurement == "mem" AND r._field == "used_percent")|> group(columns:["host"])|> mean()`,
},
},
{

View File

@ -160,7 +160,7 @@ Since flux will group each series into its own table, we sometimes need to modif
```
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> group(by: ["region"])
|> group(columns: ["region"])
|> mean()
```
@ -171,7 +171,7 @@ Similarly, if we wanted to group points into buckets of time, the `window` funct
```
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> group(by: ["region"])
|> group(columns: ["region"])
|> window(every: 1m)
|> mean()
```

View File

@ -79,7 +79,7 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
SeriesOffset: spec.SeriesOffset,
Descending: spec.Descending,
OrderByTime: spec.OrderByTime,
GroupMode: storage.GroupMode(spec.GroupMode),
GroupMode: storage.ToGroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys,
AggregateMethod: spec.AggregateMethod,
},

View File

@ -3,11 +3,13 @@ package storage
import (
"context"
"fmt"
"log"
"math"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/platform"
"github.com/pkg/errors"
@ -182,6 +184,20 @@ const (
GroupModeExcept
)
// ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode.
func ToGroupMode(fluxMode functions.GroupMode) GroupMode {
switch fluxMode {
case functions.GroupModeNone:
return GroupModeDefault
case functions.GroupModeBy:
return GroupModeBy
case functions.GroupModeExcept:
return GroupModeExcept
default:
panic(fmt.Sprint("unknown group mode: ", fluxMode))
}
}
type ReadSpec struct {
OrganizationID platform.ID
BucketID platform.ID

View File

@ -119,7 +119,7 @@ We group together the streams based on the `GROUP BY` clause. As an example:
```
> SELECT mean(usage_user) FROM telegraf..cpu WHERE time >= now() - 5m GROUP BY time(5m), host
... |> group(by: ["_measurement", "_start", "host"]) |> window(every: 5m)
... |> group(columns: ["_measurement", "_start", "host"]) |> window(every: 5m)
```
If the `GROUP BY time(...)` doesn't exist, `window()` is skipped. Grouping will have a default of [`_measurement`, `_start`], regardless of whether a GROUP BY clause is present. If there are keys in the group by clause, they are concatenated with the default list. If a wildcard is used for grouping, then this step is skipped.
@ -141,7 +141,7 @@ For an aggregate, the following is used instead:
```
> SELECT mean(usage_user) FROM telegraf..cpu
create_cursor(bucket: "telegraf/autogen", start: -5m, m: "cpu", f: "usage_user")
|> group(except: ["_field"])
|> group(columns: ["_field"], mode: "except")
|> mean(timeSrc: "_start", columns: ["_value"])
```
@ -316,9 +316,9 @@ At this point, we have a table with the partition key that is organized by the k
We group by the measurement and the key and then use `distinct` on the values. After we find the distinct values, we group these values back by their measurements again so all of the tag values for a measurement are grouped together. We then rename the columns to the expected names.
```
... |> group(by: ["_measurement", "_key"])
... |> group(columns: ["_measurement", "_key"])
|> distinct(column: "_value")
|> group(by: ["_measurement"])
|> group(columns: ["_measurement"])
|> rename(columns: {_key: "key", _value: "value"})
```

View File

@ -366,7 +366,8 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
// there is always something to group in influxql.
// TODO(jsternberg): A wildcard will skip this step.
id := t.op("group", &transformations.GroupOpSpec{
By: tags,
Columns: tags,
Mode: "by",
}, in.ID())
if windowEvery > 0 {

View File

@ -115,7 +115,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
&aggregate,

View File

@ -105,7 +105,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
&aggregate,

View File

@ -80,7 +80,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start", "host"},
Columns: []string{"_measurement", "_start", "host"},
Mode: "by",
},
},
&aggregate,

View File

@ -82,7 +82,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -82,7 +82,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -80,7 +80,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{
@ -158,7 +159,8 @@ func init() {
{
ID: "group1",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -80,7 +80,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{
@ -201,7 +202,8 @@ func init() {
{
ID: "group1",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -80,7 +80,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -106,7 +106,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -107,7 +107,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{
@ -258,7 +259,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -80,7 +80,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
{

View File

@ -119,7 +119,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_start"},
Columns: []string{"_measurement", "_start"},
Mode: "by",
},
},
&selector,

View File

@ -40,7 +40,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_key"},
Columns: []string{"_measurement", "_key"},
Mode: "by",
},
},
{
@ -52,7 +53,8 @@ func init() {
{
ID: "group1",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement"},
Columns: []string{"_measurement"},
Mode: "by",
},
},
{

View File

@ -40,7 +40,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_key"},
Columns: []string{"_measurement", "_key"},
Mode: "by",
},
},
{
@ -52,7 +53,8 @@ func init() {
{
ID: "group1",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement"},
Columns: []string{"_measurement"},
Mode: "by",
},
},
{

View File

@ -86,7 +86,8 @@ func init() {
{
ID: "group0",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement", "_key"},
Columns: []string{"_measurement", "_key"},
Mode: "by",
},
},
{
@ -98,7 +99,8 @@ func init() {
{
ID: "group1",
Spec: &transformations.GroupOpSpec{
By: []string{"_measurement"},
Columns: []string{"_measurement"},
Mode: "by",
},
},
{

View File

@ -202,11 +202,13 @@ func (t *transpilerState) transpileShowTagValues(ctx context.Context, stmt *infl
"_value": "value",
},
}, t.op("group", &transformations.GroupOpSpec{
By: []string{"_measurement"},
Columns: []string{"_measurement"},
Mode: "by",
}, t.op("distinct", &transformations.DistinctOpSpec{
Column: execute.DefaultValueColLabel,
}, t.op("group", &transformations.GroupOpSpec{
By: []string{"_measurement", "_key"},
Columns: []string{"_measurement", "_key"},
Mode: "by",
}, op)))), nil
}

View File

@ -313,7 +313,8 @@ func (a *Aggregate) QuerySpec() (*flux.Operation, error) {
return &flux.Operation{
ID: "merge",
Spec: &transformations.GroupOpSpec{
By: keys,
Columns: keys,
Mode: "by",
},
}, nil
}

View File

@ -46,7 +46,7 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF
|> range(start: -100h)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
%s
|> group(by: ["taskID", "runID", "_measurement"])
|> group(columns: ["taskID", "runID", "_measurement"])
`, filterPart)
auth, err := pctx.GetAuthorizer(ctx)
@ -110,7 +110,7 @@ func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunF
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records" and r.taskID == %q)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(by: ["scheduledFor"])
|> group(columns: ["scheduledFor"])
|> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q)
|> sort(desc: true, columns: ["_start"]) |> limit(n: 1)
@ -122,7 +122,7 @@ main = from(bucketID: "000000000000000a")
|> filter(fn: (r) => r.runID > %q)
join(tables: {main: main, supl: supl}, on: ["_start", "_stop", "orgID", "taskID", "runID", "_measurement"])
|> group(by: ["_measurement"])
|> group(columns: ["_measurement"])
%s
|> yield(name: "result")
`, runFilter.Task.String(), scheduledBefore, scheduledAfter, runFilter.Task.String(), afterID, limit)
@ -151,7 +151,7 @@ func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platfor
|> filter(fn: (r) => r._measurement == "records")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r.runID == %q)
|> group(by: ["scheduledFor"])
|> group(columns: ["scheduledFor"])
|> sort(desc: true, columns: ["_start"]) |> limit(n: 1)
logs = from(bucketID: "000000000000000a")

View File

@ -68,7 +68,7 @@ describe('Logs.LogQuery', () => {
|> range(start: 2018-10-10T22:46:24.859Z, stop: 2018-10-10T22:46:54.859Z)
|> filter(fn: (r) => r._measurement == "syslog")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(none: true)
|> group()
|> sort(columns: ["_time"])
|> map(fn: (r) => ({time: r._time,
severity: r.severity,

View File

@ -37,7 +37,7 @@ describe('Logs.V2.queryBuilder', () => {
|> range(start: 2018-10-10T22:46:24.859Z, stop: 2018-10-10T22:46:54.859Z)
|> filter(fn: (r) => r._measurement == "syslog")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(none: true)
|> group()
|> sort(columns: ["_time"])
|> map(fn: (r) => ({time: r._time,
severity: r.severity,
@ -61,7 +61,7 @@ describe('Logs.V2.queryBuilder', () => {
|> range(start: 2018-10-10T22:46:24.859Z, stop: 2018-10-10T22:46:54.859Z)
|> filter(fn: (r) => r._measurement == "syslog")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(none: true)
|> group()
|> filter(fn: (r) => r.severity != "notice")
|> sort(columns: ["_time"])
|> map(fn: (r) => ({time: r._time,
@ -91,7 +91,7 @@ describe('Logs.V2.queryBuilder', () => {
|> range(start: 2018-10-10T22:46:24.859Z, stop: 2018-10-10T22:46:54.859Z)
|> filter(fn: (r) => r._measurement == "syslog")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(none: true)
|> group()
|> filter(fn: (r) =>
r.severity == "notice" and
r.appname !~ /beep/ and

View File

@ -31,7 +31,7 @@ const buildRowsQuery = (
`range(start: ${lower}, stop: ${upper})`,
`filter(fn: (${ROW_NAME}) => ${ROW_NAME}._measurement == "${measurement}")`,
`pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")`,
`group(none: true)`,
`group()`,
]
}

View File

@ -366,7 +366,7 @@ export const FLUX_FUNCTIONS: FluxToolbarFunction[] = [
],
desc:
'Groups records based on their values for specific columns. It produces tables with new group keys based on provided properties.',
example: 'group(by: ["host", "_measurement"])',
example: 'group(columns: ["host", "_measurement"])',
category: 'Transformations',
link:
'https://docs.influxdata.com/flux/latest/functions/transformations/group',

View File

@ -72,7 +72,7 @@ From the following request:
from(db: "telegraf")
|> range(start: -24h)
|> group(none: true)
|> group()
|> keys(except:["_time","_value","_start","_stop"])
|> map(fn: (r) => r._value)
*/