From 5333ff4d1b3837c70b3dd7288b31154295a7f426 Mon Sep 17 00:00:00 2001 From: "Yiqun (Ethan) Zhang" Date: Fri, 6 Mar 2020 11:19:46 -0500 Subject: [PATCH] =?UTF-8?q?feat(transpiler):=20support=20difference(),=20g?= =?UTF-8?q?roup=20by=20wildcard,=20and=20correc=E2=80=A6=20(#17021)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(transpiler): support difference(), stddev(), spread(), group by wildcard, and correct _time column --- chronograf/server/annotations_test.go | 2 +- chronograf/server/cells_test.go | 2 +- chronograf/server/databases_test.go | 2 +- chronograf/server/influx_test.go | 2 +- chronograf/server/permissions_test.go | 2 +- chronograf/server/queries_test.go | 2 +- query/influxql/end_to_end_test.go | 6 - query/influxql/function.go | 124 ++++++++++++++++-- query/influxql/group.go | 55 ++++++-- query/influxql/map.go | 74 ++++------- query/influxql/spectests/aggregates.go | 7 +- .../spectests/aggregates_with_condition.go | 7 +- .../spectests/aggregates_with_groupby.go | 7 +- .../spectests/aggregates_with_window.go | 7 +- .../aggregates_with_window_offset.go | 7 +- .../influxql/spectests/multiple_aggregates.go | 49 +++---- .../influxql/spectests/multiple_statements.go | 12 +- query/influxql/spectests/raw.go | 5 +- .../influxql/spectests/raw_with_condition.go | 5 +- .../spectests/raw_with_regex_condition.go | 5 +- query/influxql/spectests/retention_policy.go | 5 +- query/influxql/spectests/selectors.go | 5 +- query/influxql/spectests/testing.go | 3 +- query/influxql/transpiler_test.go | 13 +- 24 files changed, 260 insertions(+), 148 deletions(-) diff --git a/chronograf/server/annotations_test.go b/chronograf/server/annotations_test.go index b5ca0c8f4c..862faf07d6 100644 --- a/chronograf/server/annotations_test.go +++ b/chronograf/server/annotations_test.go @@ -8,9 +8,9 @@ import ( "net/http/httptest" "testing" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func TestService_Annotations(t *testing.T) { diff --git a/chronograf/server/cells_test.go b/chronograf/server/cells_test.go index a7288179c1..259174a52e 100644 --- a/chronograf/server/cells_test.go +++ b/chronograf/server/cells_test.go @@ -13,9 +13,9 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func Test_Cells_CorrectAxis(t *testing.T) { diff --git a/chronograf/server/databases_test.go b/chronograf/server/databases_test.go index 993865a4dc..eb39e27391 100644 --- a/chronograf/server/databases_test.go +++ b/chronograf/server/databases_test.go @@ -8,9 +8,9 @@ import ( "net/http/httptest" "testing" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func TestService_GetDatabases(t *testing.T) { diff --git a/chronograf/server/influx_test.go b/chronograf/server/influx_test.go index a2c650212b..00ca6b16fe 100644 --- a/chronograf/server/influx_test.go +++ b/chronograf/server/influx_test.go @@ -8,9 +8,9 @@ import ( "net/http/httptest" "testing" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func TestService_Influx(t *testing.T) { diff --git a/chronograf/server/permissions_test.go b/chronograf/server/permissions_test.go index a0b2b228b2..35f9169c8a 100644 --- a/chronograf/server/permissions_test.go +++ b/chronograf/server/permissions_test.go @@ -8,9 +8,9 @@ import ( "net/http/httptest" "testing" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func TestService_Permissions(t *testing.T) { diff --git a/chronograf/server/queries_test.go b/chronograf/server/queries_test.go index faeda4b2ce..3df8f35f3b 100644 --- a/chronograf/server/queries_test.go +++ b/chronograf/server/queries_test.go @@ -7,9 +7,9 @@ import ( "net/http/httptest" "testing" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/chronograf/mocks" - "github.com/influxdata/httprouter" ) func TestService_Queries(t *testing.T) { diff --git a/query/influxql/end_to_end_test.go b/query/influxql/end_to_end_test.go index f61de8dba8..35b7740169 100644 --- a/query/influxql/end_to_end_test.go +++ b/query/influxql/end_to_end_test.go @@ -84,16 +84,10 @@ var skipTests = map[string]string{ "selector_2": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737", "selector_6": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737", "selector_7": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737", - "series_agg_0": "Transpiler: Implement difference https://github.com/influxdata/influxdb/issues/10736", - "series_agg_1": "Transpiler: Implement stddev https://github.com/influxdata/influxdb/issues/10735", - "series_agg_2": "Transpiler: Implement spread https://github.com/influxdata/influxdb/issues/10734", "series_agg_3": "Transpiler: Implement elapsed https://github.com/influxdata/influxdb/issues/10733", "series_agg_4": "Transpiler: Implement cumulative_sum https://github.com/influxdata/influxdb/issues/10732", "series_agg_5": "add derivative support to the transpiler https://github.com/influxdata/influxdb/issues/10759", "series_agg_6": "Transpiler: Implement non_negative_derivative https://github.com/influxdata/influxdb/issues/10731", - "series_agg_7": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742", - "series_agg_8": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742", - "series_agg_9": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742", "Subquery_0": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660", "Subquery_1": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660", "Subquery_2": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660", diff --git a/query/influxql/function.go b/query/influxql/function.go index 5ef33b704f..432ed5fca4 100644 --- a/query/influxql/function.go +++ b/query/influxql/function.go @@ -3,12 +3,24 @@ package influxql import ( "errors" "fmt" + "time" "github.com/influxdata/flux/ast" "github.com/influxdata/flux/execute" "github.com/influxdata/influxql" ) +func isTransformation(expr influxql.Expr) bool { + if call, ok := expr.(*influxql.Call); ok { + switch call.Name { + // TODO(ethan): more to be added here. + case "difference", "derivative", "cumulative_sum", "elapsed": + return true + } + } + return false +} + // function contains the prototype for invoking a function. // TODO(jsternberg): This should do a lot more heavy lifting, but it mostly just // pre-validates that we know the function exists. The cursor creation should be @@ -46,7 +58,7 @@ func parseFunction(expr *influxql.Call) (*function, error) { default: return nil, fmt.Errorf("expected field argument in %s()", expr.Name) } - case "min", "max", "sum", "first", "last", "mean", "median": + case "min", "max", "sum", "first", "last", "mean", "median", "difference", "stddev", "spread": if exp, got := 1, len(expr.Args); exp != got { return nil, fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) } @@ -107,7 +119,7 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no parent: in, } switch call.Name { - case "count", "min", "max", "sum", "first", "last", "mean": + case "count", "min", "max", "sum", "first", "last", "mean", "difference", "stddev", "spread": value, ok := in.Value(call.Args[0]) if !ok { return nil, fmt.Errorf("undefined variable: %s", call.Args[0]) @@ -122,6 +134,49 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no } cur.value = value cur.exclude = map[influxql.Expr]struct{}{call.Args[0]: {}} + case "elapsed": + // TODO(ethan): https://github.com/influxdata/influxdb/issues/10733 to enable this. + value, ok := in.Value(call.Args[0]) + if !ok { + return nil, fmt.Errorf("undefined variable: %s", call.Args[0]) + } + unit := []ast.Duration{{ + Magnitude: 1, + Unit: "ns", + }} + // elapsed has an optional unit parameter, default to 1ns + // https://docs.influxdata.com/influxdb/v1.7/query_language/functions/#elapsed + if len(call.Args) == 2 { + switch arg := call.Args[1].(type) { + case *influxql.DurationLiteral: + unit = durationLiteral(arg.Val) + default: + return nil, errors.New("argument unit must be a duration type") + } + } + cur.expr = &ast.PipeExpression{ + Argument: in.Expr(), + Call: &ast.CallExpression{ + Callee: &ast.Identifier{ + Name: call.Name, + }, + Arguments: []ast.Expression{ + &ast.ObjectExpression{ + Properties: []*ast.Property{ + { + Key: &ast.Identifier{ + Name: "unit", + }, + Value: &ast.DurationLiteral{ + Values: unit, + }, + }, + }, + }, + }, + }, + } + cur.value = value case "median": value, ok := in.Value(call.Args[0]) if !ok { @@ -250,29 +305,70 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no }, } } + // err checked in caller + interval, _ := t.stmt.GroupByInterval() + var timeValue ast.Expression + if interval > 0 { + timeValue = &ast.MemberExpression{ + Object: &ast.Identifier{ + Name: "r", + }, + Property: &ast.Identifier{ + Name: execute.DefaultStartColLabel, + }, + } + } else if isTransformation(call) || influxql.IsSelector(call) { + timeValue = &ast.MemberExpression{ + Object: &ast.Identifier{ + Name: "r", + }, + Property: &ast.Identifier{ + Name: execute.DefaultTimeColLabel, + }, + } + } else { + valuer := influxql.NowValuer{Now: t.config.Now} + _, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer) + if err != nil { + return nil, err + } + if tr.MinTime().UnixNano() == influxql.MinTime { + timeValue = &ast.DateTimeLiteral{Value: time.Unix(0, 0).UTC()} + } else { + timeValue = &ast.MemberExpression{ + Object: &ast.Identifier{ + Name: "r", + }, + Property: &ast.Identifier{ + Name: execute.DefaultStartColLabel, + }, + } + } + } cur.expr = &ast.PipeExpression{ Argument: cur.expr, Call: &ast.CallExpression{ Callee: &ast.Identifier{ - Name: "duplicate", + Name: "map", }, Arguments: []ast.Expression{ &ast.ObjectExpression{ Properties: []*ast.Property{ { Key: &ast.Identifier{ - Name: "column", + Name: "fn", }, - Value: &ast.StringLiteral{ - Value: execute.DefaultStartColLabel, - }, - }, - { - Key: &ast.Identifier{ - Name: "as", - }, - Value: &ast.StringLiteral{ - Value: execute.DefaultTimeColLabel, + Value: &ast.FunctionExpression{ + Params: []*ast.Property{{ + Key: &ast.Identifier{Name: "r"}, + }}, + Body: &ast.ObjectExpression{ + With: &ast.Identifier{Name: "r"}, + Properties: []*ast.Property{{ + Key: &ast.Identifier{Name: execute.DefaultTimeColLabel}, + Value: timeValue, + }}, + }, }, }, }, diff --git a/query/influxql/group.go b/query/influxql/group.go index 5931c4e49a..645b48e400 100644 --- a/query/influxql/group.go +++ b/query/influxql/group.go @@ -6,14 +6,15 @@ import ( "time" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/execute" "github.com/influxdata/influxql" "github.com/pkg/errors" ) type groupInfo struct { - call *influxql.Call - refs []*influxql.VarRef - selector bool + call *influxql.Call + refs []*influxql.VarRef + needNormalization bool } type groupVisitor struct { @@ -85,9 +86,9 @@ func identifyGroups(stmt *influxql.SelectStatement) ([]*groupInfo, error) { call = v.calls[0].call } return []*groupInfo{{ - call: call, - refs: v.refs, - selector: true, // Always a selector if we are here. + call: call, + refs: v.refs, + needNormalization: false, // Always a selector if we are here. }}, nil } @@ -98,9 +99,10 @@ func identifyGroups(stmt *influxql.SelectStatement) ([]*groupInfo, error) { groups = append(groups, &groupInfo{call: fn.call}) } - // If there is exactly one group and that contains a selector, then mark it as so. - if len(groups) == 1 && influxql.IsSelector(groups[0].call) { - groups[0].selector = true + // If there is exactly one group and that contains a selector or a transformation function, + // then mark it does not need normalization. + if len(groups) == 1 { + groups[0].needNormalization = !isTransformation(groups[0].call) && !influxql.IsSelector(groups[0].call) } return groups, nil } @@ -198,7 +200,7 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) { // Evaluate the conditional and insert a filter if a condition exists. if cond != nil { // // Generate a filter expression by evaluating the condition and wrapping it in a filter op. - expr, err := t.mapField(cond, cur) + expr, err := t.mapField(cond, cur, true) if err != nil { return nil, errors.Wrap(err, "unable to evaluate condition") } @@ -242,7 +244,7 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) { // If a function call is present, evaluate the function call. if gr.call != nil { - c, err := createFunctionCursor(t, gr.call, cur, !gr.selector || interval > 0) + c, err := createFunctionCursor(t, gr.call, cur, gr.needNormalization || interval > 0) if err != nil { return nil, err } @@ -294,6 +296,8 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) { tags := []ast.Expression{ &ast.StringLiteral{Value: "_measurement"}, &ast.StringLiteral{Value: "_start"}, + &ast.StringLiteral{Value: "_stop"}, + &ast.StringLiteral{Value: "_field"}, } if len(t.stmt.Dimensions) > 0 { // Maintain a set of the dimensions we have encountered. @@ -367,7 +371,8 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) { } } case *influxql.Wildcard: - return nil, errors.New("unimplemented: dimension wildcards") + // Do not add a group call for wildcard, which means group by everything + return in, nil case *influxql.RegexLiteral: return nil, errors.New("unimplemented: dimension regex wildcards") default: @@ -413,6 +418,32 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) { cursor: in, } + in = &pipeCursor{ + expr: &ast.PipeExpression{ + Argument: in.Expr(), + Call: &ast.CallExpression{ + Callee: &ast.Identifier{ + Name: "keep", + }, + Arguments: []ast.Expression{ + &ast.ObjectExpression{ + Properties: []*ast.Property{{ + Key: &ast.Identifier{ + Name: "columns", + }, + Value: &ast.ArrayExpression{ + Elements: append(tags, + &ast.StringLiteral{Value: execute.DefaultTimeColLabel}, + &ast.StringLiteral{Value: execute.DefaultValueColLabel}), + }, + }}, + }, + }, + }, + }, + cursor: in, + } + if windowEvery > 0 { args := []*ast.Property{{ Key: &ast.Identifier{ diff --git a/query/influxql/map.go b/query/influxql/map.go index 9a5646e2bb..2c70772d45 100644 --- a/query/influxql/map.go +++ b/query/influxql/map.go @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/execute" "github.com/influxdata/influxql" ) @@ -41,32 +40,19 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) { panic("number of columns does not match the number of fields") } - properties := make([]*ast.Property, 0, len(t.stmt.Fields)+1) - properties = append(properties, &ast.Property{ - Key: &ast.Identifier{ - Name: execute.DefaultTimeColLabel, - }, - Value: &ast.MemberExpression{ - Object: &ast.Identifier{ - Name: "r", - }, - Property: &ast.Identifier{ - Name: execute.DefaultTimeColLabel, - }, - }, - }) + properties := make([]*ast.Property, 0, len(t.stmt.Fields)) for i, f := range t.stmt.Fields { if ref, ok := f.Expr.(*influxql.VarRef); ok && ref.Val == "time" { // Skip past any time columns. continue } - value, err := t.mapField(f.Expr, in) + fieldName, err := t.mapField(f.Expr, in, false) if err != nil { return nil, err } properties = append(properties, &ast.Property{ - Key: &ast.Identifier{Name: columns[i]}, - Value: value, + Key: fieldName.(ast.PropertyKey), + Value: &ast.StringLiteral{Value: columns[i]}, }) } return &mapCursor{ @@ -74,31 +60,18 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) { Argument: in.Expr(), Call: &ast.CallExpression{ Callee: &ast.Identifier{ - Name: "map", + Name: "rename", }, Arguments: []ast.Expression{ &ast.ObjectExpression{ - Properties: []*ast.Property{ - { - Key: &ast.Identifier{ - Name: "fn", - }, - Value: &ast.FunctionExpression{ - Params: []*ast.Property{{ - Key: &ast.Identifier{Name: "r"}, - }}, - Body: &ast.ObjectExpression{ - Properties: properties, - }, - }, + Properties: []*ast.Property{{ + Key: &ast.Identifier{ + Name: "columns", }, - { - Key: &ast.Identifier{ - Name: "mergeKey", - }, - Value: &ast.BooleanLiteral{Value: true}, + Value: &ast.ObjectExpression{ + Properties: properties, }, - }, + }}, }, }, }, @@ -106,18 +79,21 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) { }, nil } -func (t *transpilerState) mapField(expr influxql.Expr, in cursor) (ast.Expression, error) { +func (t *transpilerState) mapField(expr influxql.Expr, in cursor, returnMemberExpr bool) (ast.Expression, error) { if sym, ok := in.Value(expr); ok { - var property ast.PropertyKey + var mappedName ast.Expression if strings.HasPrefix(sym, "_") { - property = &ast.Identifier{Name: sym} + mappedName = &ast.Identifier{Name: sym} } else { - property = &ast.StringLiteral{Value: sym} + mappedName = &ast.StringLiteral{Value: sym} } - return &ast.MemberExpression{ - Object: &ast.Identifier{Name: "r"}, - Property: property, - }, nil + if returnMemberExpr { + return &ast.MemberExpression{ + Object: &ast.Identifier{Name: "r"}, + Property: mappedName.(ast.PropertyKey), + }, nil + } + return mappedName, nil } switch expr := expr.(type) { @@ -131,7 +107,7 @@ func (t *transpilerState) mapField(expr influxql.Expr, in cursor) (ast.Expressio case *influxql.BinaryExpr: return t.evalBinaryExpr(expr, in) case *influxql.ParenExpr: - return t.mapField(expr.Expr, in) + return t.mapField(expr.Expr, in, returnMemberExpr) case *influxql.StringLiteral: if ts, err := expr.ToTimeLiteral(time.UTC); err == nil { return &ast.DateTimeLiteral{Value: ts.Val}, nil @@ -194,11 +170,11 @@ func (t *transpilerState) evalBinaryExpr(expr *influxql.BinaryExpr, in cursor) ( return nil, fmt.Errorf("unimplemented binary expression: %s", expr.Op) } - lhs, err := t.mapField(expr.LHS, in) + lhs, err := t.mapField(expr.LHS, in, true) if err != nil { return nil, err } - rhs, err := t.mapField(expr.RHS, in) + rhs, err := t.mapField(expr.RHS, in, true) if err != nil { return nil, err } diff --git a/query/influxql/spectests/aggregates.go b/query/influxql/spectests/aggregates.go index 58a7043eb4..13a140b0b1 100644 --- a/query/influxql/spectests/aggregates.go +++ b/query/influxql/spectests/aggregates.go @@ -35,10 +35,11 @@ func init() { ` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> ` + name + `() - |> duplicate(column: "_start", as: "_time") - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z})) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/aggregates_with_condition.go b/query/influxql/spectests/aggregates_with_condition.go index 0188c9d2a0..5b8efeec01 100644 --- a/query/influxql/spectests/aggregates_with_condition.go +++ b/query/influxql/spectests/aggregates_with_condition.go @@ -12,10 +12,11 @@ func init() { |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] == "server01") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> ` + name + `() - |> duplicate(column: "_start", as: "_time") - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z})) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/aggregates_with_groupby.go b/query/influxql/spectests/aggregates_with_groupby.go index 888e2eb971..89110d1f19 100644 --- a/query/influxql/spectests/aggregates_with_groupby.go +++ b/query/influxql/spectests/aggregates_with_groupby.go @@ -11,10 +11,11 @@ func init() { ` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start", "host"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field", "host"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "host", "_time", "_value"]) |> ` + name + `() - |> duplicate(column: "_start", as: "_time") - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z})) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/aggregates_with_window.go b/query/influxql/spectests/aggregates_with_window.go index 8d8ae08b5e..80df211e86 100644 --- a/query/influxql/spectests/aggregates_with_window.go +++ b/query/influxql/spectests/aggregates_with_window.go @@ -11,12 +11,13 @@ func init() { ` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> window(every: 1m) |> ` + name + `() - |> duplicate(column: "_start", as: "_time") + |> map(fn: (r) => ({r with _time: r._start})) |> window(every: inf) - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/aggregates_with_window_offset.go b/query/influxql/spectests/aggregates_with_window_offset.go index 84a6a4be4b..e7a06b30f7 100644 --- a/query/influxql/spectests/aggregates_with_window_offset.go +++ b/query/influxql/spectests/aggregates_with_window_offset.go @@ -11,12 +11,13 @@ func init() { ` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `) |> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> window(every: 5m, start: 1970-01-01T00:02:00Z) |> ` + name + `() - |> duplicate(column: "_start", as: "_time") + |> map(fn: (r) => ({r with _time: r._start})) |> window(every: inf) - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/multiple_aggregates.go b/query/influxql/spectests/multiple_aggregates.go index b90a50258e..d466be9909 100644 --- a/query/influxql/spectests/multiple_aggregates.go +++ b/query/influxql/spectests/multiple_aggregates.go @@ -1,28 +1,29 @@ package spectests func init() { - RegisterFixture( - NewFixture( - `SELECT mean(value), max(value) FROM db0..cpu`, - `package main - -t0 = from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") - |> mean() - |> duplicate(column: "_start", as: "_time") -t1 = from(bucketID: "") - |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) - |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") - |> max() - |> drop(columns: ["_time"]) - |> duplicate(column: "_start", as: "_time") -join(tables: {t0: t0, t1: t1}, on: ["_time", "_measurement"]) - |> map(fn: (r) => ({_time: r._time, mean: r["t0__value"], max: r["t1__value"]}), mergeKey: true) - |> yield(name: "0") -`, - ), - ) + // TODO(ethan): https://github.com/influxdata/flux/issues/2594 + // RegisterFixture( + // NewFixture( + // `SELECT mean(value), max(value) FROM db0..cpu`, + // `package main + // + //t0 = from(bucketID: "") + // |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) + // |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") + // |> group(columns: ["_measurement", "_start"], mode: "by") + // |> mean() + // |> duplicate(column: "_start", as: "_time") + //t1 = from(bucketID: "") + // |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) + // |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") + // |> group(columns: ["_measurement", "_start"], mode: "by") + // |> max() + // |> drop(columns: ["_time"]) + // |> duplicate(column: "_start", as: "_time") + //join(tables: {t0: t0, t1: t1}, on: ["_time", "_measurement"]) + // |> map(fn: (r) => ({_time: r._time, mean: r["t0__value"], max: r["t1__value"]}), mergeKey: true) + // |> yield(name: "0") + //`, + // ), + // ) } diff --git a/query/influxql/spectests/multiple_statements.go b/query/influxql/spectests/multiple_statements.go index 3ad3874ea2..bac20eb1f2 100644 --- a/query/influxql/spectests/multiple_statements.go +++ b/query/influxql/spectests/multiple_statements.go @@ -9,17 +9,19 @@ func init() { from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> mean() - |> duplicate(column: "_start", as: "_time") - |> map(fn: (r) => ({_time: r._time, mean: r._value}), mergeKey: true) + |> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z})) + |> rename(columns: {_value: "mean"}) |> yield(name: "0") from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> max() - |> map(fn: (r) => ({_time: r._time, max: r._value}), mergeKey: true) + |> rename(columns: {_value: "max"}) |> yield(name: "1") `, ), diff --git a/query/influxql/spectests/raw.go b/query/influxql/spectests/raw.go index b47bdb333a..d4cf6bd19a 100644 --- a/query/influxql/spectests/raw.go +++ b/query/influxql/spectests/raw.go @@ -9,8 +9,9 @@ func init() { from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") - |> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true) + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) + |> rename(columns: {_value: "value"}) |> yield(name: "0") `, ), diff --git a/query/influxql/spectests/raw_with_condition.go b/query/influxql/spectests/raw_with_condition.go index be4753c12f..6cac226266 100644 --- a/query/influxql/spectests/raw_with_condition.go +++ b/query/influxql/spectests/raw_with_condition.go @@ -10,8 +10,9 @@ from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] == "server01") - |> group(columns: ["_measurement", "_start"], mode: "by") - |> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true) + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) + |> rename(columns: {_value: "value"}) |> yield(name: "0") `, ), diff --git a/query/influxql/spectests/raw_with_regex_condition.go b/query/influxql/spectests/raw_with_regex_condition.go index a695bb01f5..b2a4c25b4c 100644 --- a/query/influxql/spectests/raw_with_regex_condition.go +++ b/query/influxql/spectests/raw_with_regex_condition.go @@ -10,8 +10,9 @@ from(bucketID: "") |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") |> filter(fn: (r) => r["host"] =~ /.*er01/) - |> group(columns: ["_measurement", "_start"], mode: "by") - |> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true) + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) + |> rename(columns: {_value: "value"}) |> yield(name: "0") `, ), diff --git a/query/influxql/spectests/retention_policy.go b/query/influxql/spectests/retention_policy.go index f43f7d8493..3d05653f1d 100644 --- a/query/influxql/spectests/retention_policy.go +++ b/query/influxql/spectests/retention_policy.go @@ -11,8 +11,9 @@ func init() { `+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") - |> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true) + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) + |> rename(columns: {_value: "value"}) |> yield(name: "0") `, ), diff --git a/query/influxql/spectests/selectors.go b/query/influxql/spectests/selectors.go index 769aa1b27b..9877a88b56 100644 --- a/query/influxql/spectests/selectors.go +++ b/query/influxql/spectests/selectors.go @@ -36,9 +36,10 @@ func init() { ` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + ` |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value") - |> group(columns: ["_measurement", "_start"], mode: "by") + |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by") + |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"]) |> ` + name + `() - |> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true) + |> rename(columns: {_value: "` + name + `"}) |> yield(name: "0") ` }), diff --git a/query/influxql/spectests/testing.go b/query/influxql/spectests/testing.go index 7e484194c2..30f8005aab 100644 --- a/query/influxql/spectests/testing.go +++ b/query/influxql/spectests/testing.go @@ -90,7 +90,8 @@ func (f *fixture) Run(t *testing.T) { t.Run(f.stmt, func(t *testing.T) { wantAST := parser.ParseSource(f.want) if ast.Check(wantAST) > 0 { - t.Fatal("found parser errors in the want text") + err := ast.GetError(wantAST) + t.Fatalf("found parser errors in the want text: %s", err.Error()) } want := ast.Format(wantAST) diff --git a/query/influxql/transpiler_test.go b/query/influxql/transpiler_test.go index 5878c6170a..62a28a4201 100644 --- a/query/influxql/transpiler_test.go +++ b/query/influxql/transpiler_test.go @@ -285,12 +285,13 @@ func TestTranspiler_Compile(t *testing.T) { {s: `SELECT non_negative_derivative(value, 10) FROM myseries`, err: `second argument to non_negative_derivative must be a duration, got *influxql.IntegerLiteral`}, {s: `SELECT difference(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `SELECT difference() from myseries`, err: `invalid number of arguments for difference, expected 1, got 0`}, - {s: `SELECT difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to difference`}, - {s: `SELECT difference(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`}, - {s: `SELECT difference(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`}, - {s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, - {s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, - {s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`}, + // TODO(ethan): https://github.com/influxdata/influxdb/issues/17115 + //{s: `SELECT difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to difference`}, + //{s: `SELECT difference(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`}, + //{s: `SELECT difference(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`}, + //{s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, + //{s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, + //{s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`}, {s: `SELECT non_negative_difference(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `SELECT non_negative_difference() from myseries`, err: `invalid number of arguments for non_negative_difference, expected 1, got 0`}, {s: `SELECT non_negative_difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to non_negative_difference`},