feat(transpiler): support difference(), group by wildcard, and correc… (#17021)
* feat(transpiler): support difference(), stddev(), spread(), group by wildcard, and correct _time columnpull/17119/head
parent
95c72c1679
commit
5333ff4d1b
|
|
@ -8,9 +8,9 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func TestService_Annotations(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -13,9 +13,9 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func Test_Cells_CorrectAxis(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func TestService_GetDatabases(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func TestService_Influx(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func TestService_Permissions(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/chronograf"
|
||||
"github.com/influxdata/influxdb/chronograf/mocks"
|
||||
"github.com/influxdata/httprouter"
|
||||
)
|
||||
|
||||
func TestService_Queries(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -84,16 +84,10 @@ var skipTests = map[string]string{
|
|||
"selector_2": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737",
|
||||
"selector_6": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737",
|
||||
"selector_7": "Transpiler: first function uses different series than influxQL https://github.com/influxdata/influxdb/issues/10737",
|
||||
"series_agg_0": "Transpiler: Implement difference https://github.com/influxdata/influxdb/issues/10736",
|
||||
"series_agg_1": "Transpiler: Implement stddev https://github.com/influxdata/influxdb/issues/10735",
|
||||
"series_agg_2": "Transpiler: Implement spread https://github.com/influxdata/influxdb/issues/10734",
|
||||
"series_agg_3": "Transpiler: Implement elapsed https://github.com/influxdata/influxdb/issues/10733",
|
||||
"series_agg_4": "Transpiler: Implement cumulative_sum https://github.com/influxdata/influxdb/issues/10732",
|
||||
"series_agg_5": "add derivative support to the transpiler https://github.com/influxdata/influxdb/issues/10759",
|
||||
"series_agg_6": "Transpiler: Implement non_negative_derivative https://github.com/influxdata/influxdb/issues/10731",
|
||||
"series_agg_7": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742",
|
||||
"series_agg_8": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742",
|
||||
"series_agg_9": "Transpiler should remove _start column https://github.com/influxdata/influxdb/issues/10742",
|
||||
"Subquery_0": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660",
|
||||
"Subquery_1": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660",
|
||||
"Subquery_2": "Implement subqueries in the transpiler https://github.com/influxdata/influxdb/issues/10660",
|
||||
|
|
|
|||
|
|
@ -3,12 +3,24 @@ package influxql
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func isTransformation(expr influxql.Expr) bool {
|
||||
if call, ok := expr.(*influxql.Call); ok {
|
||||
switch call.Name {
|
||||
// TODO(ethan): more to be added here.
|
||||
case "difference", "derivative", "cumulative_sum", "elapsed":
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// function contains the prototype for invoking a function.
|
||||
// TODO(jsternberg): This should do a lot more heavy lifting, but it mostly just
|
||||
// pre-validates that we know the function exists. The cursor creation should be
|
||||
|
|
@ -46,7 +58,7 @@ func parseFunction(expr *influxql.Call) (*function, error) {
|
|||
default:
|
||||
return nil, fmt.Errorf("expected field argument in %s()", expr.Name)
|
||||
}
|
||||
case "min", "max", "sum", "first", "last", "mean", "median":
|
||||
case "min", "max", "sum", "first", "last", "mean", "median", "difference", "stddev", "spread":
|
||||
if exp, got := 1, len(expr.Args); exp != got {
|
||||
return nil, fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
|
||||
}
|
||||
|
|
@ -107,7 +119,7 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no
|
|||
parent: in,
|
||||
}
|
||||
switch call.Name {
|
||||
case "count", "min", "max", "sum", "first", "last", "mean":
|
||||
case "count", "min", "max", "sum", "first", "last", "mean", "difference", "stddev", "spread":
|
||||
value, ok := in.Value(call.Args[0])
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("undefined variable: %s", call.Args[0])
|
||||
|
|
@ -122,6 +134,49 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no
|
|||
}
|
||||
cur.value = value
|
||||
cur.exclude = map[influxql.Expr]struct{}{call.Args[0]: {}}
|
||||
case "elapsed":
|
||||
// TODO(ethan): https://github.com/influxdata/influxdb/issues/10733 to enable this.
|
||||
value, ok := in.Value(call.Args[0])
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("undefined variable: %s", call.Args[0])
|
||||
}
|
||||
unit := []ast.Duration{{
|
||||
Magnitude: 1,
|
||||
Unit: "ns",
|
||||
}}
|
||||
// elapsed has an optional unit parameter, default to 1ns
|
||||
// https://docs.influxdata.com/influxdb/v1.7/query_language/functions/#elapsed
|
||||
if len(call.Args) == 2 {
|
||||
switch arg := call.Args[1].(type) {
|
||||
case *influxql.DurationLiteral:
|
||||
unit = durationLiteral(arg.Val)
|
||||
default:
|
||||
return nil, errors.New("argument unit must be a duration type")
|
||||
}
|
||||
}
|
||||
cur.expr = &ast.PipeExpression{
|
||||
Argument: in.Expr(),
|
||||
Call: &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: call.Name,
|
||||
},
|
||||
Arguments: []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "unit",
|
||||
},
|
||||
Value: &ast.DurationLiteral{
|
||||
Values: unit,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
cur.value = value
|
||||
case "median":
|
||||
value, ok := in.Value(call.Args[0])
|
||||
if !ok {
|
||||
|
|
@ -250,29 +305,70 @@ func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor, no
|
|||
},
|
||||
}
|
||||
}
|
||||
// err checked in caller
|
||||
interval, _ := t.stmt.GroupByInterval()
|
||||
var timeValue ast.Expression
|
||||
if interval > 0 {
|
||||
timeValue = &ast.MemberExpression{
|
||||
Object: &ast.Identifier{
|
||||
Name: "r",
|
||||
},
|
||||
Property: &ast.Identifier{
|
||||
Name: execute.DefaultStartColLabel,
|
||||
},
|
||||
}
|
||||
} else if isTransformation(call) || influxql.IsSelector(call) {
|
||||
timeValue = &ast.MemberExpression{
|
||||
Object: &ast.Identifier{
|
||||
Name: "r",
|
||||
},
|
||||
Property: &ast.Identifier{
|
||||
Name: execute.DefaultTimeColLabel,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
valuer := influxql.NowValuer{Now: t.config.Now}
|
||||
_, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tr.MinTime().UnixNano() == influxql.MinTime {
|
||||
timeValue = &ast.DateTimeLiteral{Value: time.Unix(0, 0).UTC()}
|
||||
} else {
|
||||
timeValue = &ast.MemberExpression{
|
||||
Object: &ast.Identifier{
|
||||
Name: "r",
|
||||
},
|
||||
Property: &ast.Identifier{
|
||||
Name: execute.DefaultStartColLabel,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
cur.expr = &ast.PipeExpression{
|
||||
Argument: cur.expr,
|
||||
Call: &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: "duplicate",
|
||||
Name: "map",
|
||||
},
|
||||
Arguments: []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "column",
|
||||
Name: "fn",
|
||||
},
|
||||
Value: &ast.StringLiteral{
|
||||
Value: execute.DefaultStartColLabel,
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "as",
|
||||
},
|
||||
Value: &ast.StringLiteral{
|
||||
Value: execute.DefaultTimeColLabel,
|
||||
Value: &ast.FunctionExpression{
|
||||
Params: []*ast.Property{{
|
||||
Key: &ast.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &ast.ObjectExpression{
|
||||
With: &ast.Identifier{Name: "r"},
|
||||
Properties: []*ast.Property{{
|
||||
Key: &ast.Identifier{Name: execute.DefaultTimeColLabel},
|
||||
Value: timeValue,
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
|||
|
|
@ -6,14 +6,15 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type groupInfo struct {
|
||||
call *influxql.Call
|
||||
refs []*influxql.VarRef
|
||||
selector bool
|
||||
call *influxql.Call
|
||||
refs []*influxql.VarRef
|
||||
needNormalization bool
|
||||
}
|
||||
|
||||
type groupVisitor struct {
|
||||
|
|
@ -85,9 +86,9 @@ func identifyGroups(stmt *influxql.SelectStatement) ([]*groupInfo, error) {
|
|||
call = v.calls[0].call
|
||||
}
|
||||
return []*groupInfo{{
|
||||
call: call,
|
||||
refs: v.refs,
|
||||
selector: true, // Always a selector if we are here.
|
||||
call: call,
|
||||
refs: v.refs,
|
||||
needNormalization: false, // Always a selector if we are here.
|
||||
}}, nil
|
||||
}
|
||||
|
||||
|
|
@ -98,9 +99,10 @@ func identifyGroups(stmt *influxql.SelectStatement) ([]*groupInfo, error) {
|
|||
groups = append(groups, &groupInfo{call: fn.call})
|
||||
}
|
||||
|
||||
// If there is exactly one group and that contains a selector, then mark it as so.
|
||||
if len(groups) == 1 && influxql.IsSelector(groups[0].call) {
|
||||
groups[0].selector = true
|
||||
// If there is exactly one group and that contains a selector or a transformation function,
|
||||
// then mark it does not need normalization.
|
||||
if len(groups) == 1 {
|
||||
groups[0].needNormalization = !isTransformation(groups[0].call) && !influxql.IsSelector(groups[0].call)
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
|
@ -198,7 +200,7 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
|||
// Evaluate the conditional and insert a filter if a condition exists.
|
||||
if cond != nil {
|
||||
// // Generate a filter expression by evaluating the condition and wrapping it in a filter op.
|
||||
expr, err := t.mapField(cond, cur)
|
||||
expr, err := t.mapField(cond, cur, true)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to evaluate condition")
|
||||
}
|
||||
|
|
@ -242,7 +244,7 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
|||
|
||||
// If a function call is present, evaluate the function call.
|
||||
if gr.call != nil {
|
||||
c, err := createFunctionCursor(t, gr.call, cur, !gr.selector || interval > 0)
|
||||
c, err := createFunctionCursor(t, gr.call, cur, gr.needNormalization || interval > 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -294,6 +296,8 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
|||
tags := []ast.Expression{
|
||||
&ast.StringLiteral{Value: "_measurement"},
|
||||
&ast.StringLiteral{Value: "_start"},
|
||||
&ast.StringLiteral{Value: "_stop"},
|
||||
&ast.StringLiteral{Value: "_field"},
|
||||
}
|
||||
if len(t.stmt.Dimensions) > 0 {
|
||||
// Maintain a set of the dimensions we have encountered.
|
||||
|
|
@ -367,7 +371,8 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
|||
}
|
||||
}
|
||||
case *influxql.Wildcard:
|
||||
return nil, errors.New("unimplemented: dimension wildcards")
|
||||
// Do not add a group call for wildcard, which means group by everything
|
||||
return in, nil
|
||||
case *influxql.RegexLiteral:
|
||||
return nil, errors.New("unimplemented: dimension regex wildcards")
|
||||
default:
|
||||
|
|
@ -413,6 +418,32 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
|||
cursor: in,
|
||||
}
|
||||
|
||||
in = &pipeCursor{
|
||||
expr: &ast.PipeExpression{
|
||||
Argument: in.Expr(),
|
||||
Call: &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: "keep",
|
||||
},
|
||||
Arguments: []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{{
|
||||
Key: &ast.Identifier{
|
||||
Name: "columns",
|
||||
},
|
||||
Value: &ast.ArrayExpression{
|
||||
Elements: append(tags,
|
||||
&ast.StringLiteral{Value: execute.DefaultTimeColLabel},
|
||||
&ast.StringLiteral{Value: execute.DefaultValueColLabel}),
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
cursor: in,
|
||||
}
|
||||
|
||||
if windowEvery > 0 {
|
||||
args := []*ast.Property{{
|
||||
Key: &ast.Identifier{
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
|
|
@ -41,32 +40,19 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) {
|
|||
panic("number of columns does not match the number of fields")
|
||||
}
|
||||
|
||||
properties := make([]*ast.Property, 0, len(t.stmt.Fields)+1)
|
||||
properties = append(properties, &ast.Property{
|
||||
Key: &ast.Identifier{
|
||||
Name: execute.DefaultTimeColLabel,
|
||||
},
|
||||
Value: &ast.MemberExpression{
|
||||
Object: &ast.Identifier{
|
||||
Name: "r",
|
||||
},
|
||||
Property: &ast.Identifier{
|
||||
Name: execute.DefaultTimeColLabel,
|
||||
},
|
||||
},
|
||||
})
|
||||
properties := make([]*ast.Property, 0, len(t.stmt.Fields))
|
||||
for i, f := range t.stmt.Fields {
|
||||
if ref, ok := f.Expr.(*influxql.VarRef); ok && ref.Val == "time" {
|
||||
// Skip past any time columns.
|
||||
continue
|
||||
}
|
||||
value, err := t.mapField(f.Expr, in)
|
||||
fieldName, err := t.mapField(f.Expr, in, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
properties = append(properties, &ast.Property{
|
||||
Key: &ast.Identifier{Name: columns[i]},
|
||||
Value: value,
|
||||
Key: fieldName.(ast.PropertyKey),
|
||||
Value: &ast.StringLiteral{Value: columns[i]},
|
||||
})
|
||||
}
|
||||
return &mapCursor{
|
||||
|
|
@ -74,31 +60,18 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) {
|
|||
Argument: in.Expr(),
|
||||
Call: &ast.CallExpression{
|
||||
Callee: &ast.Identifier{
|
||||
Name: "map",
|
||||
Name: "rename",
|
||||
},
|
||||
Arguments: []ast.Expression{
|
||||
&ast.ObjectExpression{
|
||||
Properties: []*ast.Property{
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "fn",
|
||||
},
|
||||
Value: &ast.FunctionExpression{
|
||||
Params: []*ast.Property{{
|
||||
Key: &ast.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &ast.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
},
|
||||
Properties: []*ast.Property{{
|
||||
Key: &ast.Identifier{
|
||||
Name: "columns",
|
||||
},
|
||||
{
|
||||
Key: &ast.Identifier{
|
||||
Name: "mergeKey",
|
||||
},
|
||||
Value: &ast.BooleanLiteral{Value: true},
|
||||
Value: &ast.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -106,18 +79,21 @@ func (t *transpilerState) mapFields(in cursor) (cursor, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) mapField(expr influxql.Expr, in cursor) (ast.Expression, error) {
|
||||
func (t *transpilerState) mapField(expr influxql.Expr, in cursor, returnMemberExpr bool) (ast.Expression, error) {
|
||||
if sym, ok := in.Value(expr); ok {
|
||||
var property ast.PropertyKey
|
||||
var mappedName ast.Expression
|
||||
if strings.HasPrefix(sym, "_") {
|
||||
property = &ast.Identifier{Name: sym}
|
||||
mappedName = &ast.Identifier{Name: sym}
|
||||
} else {
|
||||
property = &ast.StringLiteral{Value: sym}
|
||||
mappedName = &ast.StringLiteral{Value: sym}
|
||||
}
|
||||
return &ast.MemberExpression{
|
||||
Object: &ast.Identifier{Name: "r"},
|
||||
Property: property,
|
||||
}, nil
|
||||
if returnMemberExpr {
|
||||
return &ast.MemberExpression{
|
||||
Object: &ast.Identifier{Name: "r"},
|
||||
Property: mappedName.(ast.PropertyKey),
|
||||
}, nil
|
||||
}
|
||||
return mappedName, nil
|
||||
}
|
||||
|
||||
switch expr := expr.(type) {
|
||||
|
|
@ -131,7 +107,7 @@ func (t *transpilerState) mapField(expr influxql.Expr, in cursor) (ast.Expressio
|
|||
case *influxql.BinaryExpr:
|
||||
return t.evalBinaryExpr(expr, in)
|
||||
case *influxql.ParenExpr:
|
||||
return t.mapField(expr.Expr, in)
|
||||
return t.mapField(expr.Expr, in, returnMemberExpr)
|
||||
case *influxql.StringLiteral:
|
||||
if ts, err := expr.ToTimeLiteral(time.UTC); err == nil {
|
||||
return &ast.DateTimeLiteral{Value: ts.Val}, nil
|
||||
|
|
@ -194,11 +170,11 @@ func (t *transpilerState) evalBinaryExpr(expr *influxql.BinaryExpr, in cursor) (
|
|||
return nil, fmt.Errorf("unimplemented binary expression: %s", expr.Op)
|
||||
}
|
||||
|
||||
lhs, err := t.mapField(expr.LHS, in)
|
||||
lhs, err := t.mapField(expr.LHS, in, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rhs, err := t.mapField(expr.RHS, in)
|
||||
rhs, err := t.mapField(expr.RHS, in, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,10 +35,11 @@ func init() {
|
|||
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + `
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> ` + name + `()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z}))
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -12,10 +12,11 @@ func init() {
|
|||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> filter(fn: (r) => r["host"] == "server01")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> ` + name + `()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z}))
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -11,10 +11,11 @@ func init() {
|
|||
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start", "host"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field", "host"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "host", "_time", "_value"])
|
||||
|> ` + name + `()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z}))
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -11,12 +11,13 @@ func init() {
|
|||
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|
||||
|> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> window(every: 1m)
|
||||
|> ` + name + `()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({r with _time: r._start}))
|
||||
|> window(every: inf)
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -11,12 +11,13 @@ func init() {
|
|||
` + fmt.Sprintf(`from(bucketID: "%s"`, bucketID.String()) + `)
|
||||
|> range(start: 2010-09-15T08:50:00Z, stop: 2010-09-15T09:00:00Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> window(every: 5m, start: 1970-01-01T00:02:00Z)
|
||||
|> ` + name + `()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({r with _time: r._start}))
|
||||
|> window(every: inf)
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -1,28 +1,29 @@
|
|||
package spectests
|
||||
|
||||
func init() {
|
||||
RegisterFixture(
|
||||
NewFixture(
|
||||
`SELECT mean(value), max(value) FROM db0..cpu`,
|
||||
`package main
|
||||
|
||||
t0 = from(bucketID: "")
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> mean()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
t1 = from(bucketID: "")
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> max()
|
||||
|> drop(columns: ["_time"])
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
join(tables: {t0: t0, t1: t1}, on: ["_time", "_measurement"])
|
||||
|> map(fn: (r) => ({_time: r._time, mean: r["t0__value"], max: r["t1__value"]}), mergeKey: true)
|
||||
|> yield(name: "0")
|
||||
`,
|
||||
),
|
||||
)
|
||||
// TODO(ethan): https://github.com/influxdata/flux/issues/2594
|
||||
// RegisterFixture(
|
||||
// NewFixture(
|
||||
// `SELECT mean(value), max(value) FROM db0..cpu`,
|
||||
// `package main
|
||||
//
|
||||
//t0 = from(bucketID: "")
|
||||
// |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
// |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
// |> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
// |> mean()
|
||||
// |> duplicate(column: "_start", as: "_time")
|
||||
//t1 = from(bucketID: "")
|
||||
// |> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
// |> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
// |> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
// |> max()
|
||||
// |> drop(columns: ["_time"])
|
||||
// |> duplicate(column: "_start", as: "_time")
|
||||
//join(tables: {t0: t0, t1: t1}, on: ["_time", "_measurement"])
|
||||
// |> map(fn: (r) => ({_time: r._time, mean: r["t0__value"], max: r["t1__value"]}), mergeKey: true)
|
||||
// |> yield(name: "0")
|
||||
//`,
|
||||
// ),
|
||||
// )
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,17 +9,19 @@ func init() {
|
|||
from(bucketID: "")
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> mean()
|
||||
|> duplicate(column: "_start", as: "_time")
|
||||
|> map(fn: (r) => ({_time: r._time, mean: r._value}), mergeKey: true)
|
||||
|> map(fn: (r) => ({r with _time: 1970-01-01T00:00:00Z}))
|
||||
|> rename(columns: {_value: "mean"})
|
||||
|> yield(name: "0")
|
||||
from(bucketID: "")
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> max()
|
||||
|> map(fn: (r) => ({_time: r._time, max: r._value}), mergeKey: true)
|
||||
|> rename(columns: {_value: "max"})
|
||||
|> yield(name: "1")
|
||||
`,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -9,8 +9,9 @@ func init() {
|
|||
from(bucketID: "")
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true)
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> rename(columns: {_value: "value"})
|
||||
|> yield(name: "0")
|
||||
`,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -10,8 +10,9 @@ from(bucketID: "")
|
|||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> filter(fn: (r) => r["host"] == "server01")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true)
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> rename(columns: {_value: "value"})
|
||||
|> yield(name: "0")
|
||||
`,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -10,8 +10,9 @@ from(bucketID: "")
|
|||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> filter(fn: (r) => r["host"] =~ /.*er01/)
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true)
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> rename(columns: {_value: "value"})
|
||||
|> yield(name: "0")
|
||||
`,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -11,8 +11,9 @@ func init() {
|
|||
`+fmt.Sprintf(`from(bucketID: "%s")`, altBucketID.String())+`
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> map(fn: (r) => ({_time: r._time, value: r._value}), mergeKey: true)
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> rename(columns: {_value: "value"})
|
||||
|> yield(name: "0")
|
||||
`,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -36,9 +36,10 @@ func init() {
|
|||
` + fmt.Sprintf(`from(bucketID: "%s")`, bucketID.String()) + `
|
||||
|> range(start: 1677-09-21T00:12:43.145224194Z, stop: 2262-04-11T23:47:16.854775806Z)
|
||||
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "value")
|
||||
|> group(columns: ["_measurement", "_start"], mode: "by")
|
||||
|> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
|
||||
|> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
|
||||
|> ` + name + `()
|
||||
|> map(fn: (r) => ({_time: r._time, ` + name + `: r._value}), mergeKey: true)
|
||||
|> rename(columns: {_value: "` + name + `"})
|
||||
|> yield(name: "0")
|
||||
`
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -90,7 +90,8 @@ func (f *fixture) Run(t *testing.T) {
|
|||
t.Run(f.stmt, func(t *testing.T) {
|
||||
wantAST := parser.ParseSource(f.want)
|
||||
if ast.Check(wantAST) > 0 {
|
||||
t.Fatal("found parser errors in the want text")
|
||||
err := ast.GetError(wantAST)
|
||||
t.Fatalf("found parser errors in the want text: %s", err.Error())
|
||||
}
|
||||
want := ast.Format(wantAST)
|
||||
|
||||
|
|
|
|||
|
|
@ -285,12 +285,13 @@ func TestTranspiler_Compile(t *testing.T) {
|
|||
{s: `SELECT non_negative_derivative(value, 10) FROM myseries`, err: `second argument to non_negative_derivative must be a duration, got *influxql.IntegerLiteral`},
|
||||
{s: `SELECT difference(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
|
||||
{s: `SELECT difference() from myseries`, err: `invalid number of arguments for difference, expected 1, got 0`},
|
||||
{s: `SELECT difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to difference`},
|
||||
{s: `SELECT difference(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`},
|
||||
{s: `SELECT difference(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
|
||||
{s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
|
||||
{s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
|
||||
{s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`},
|
||||
// TODO(ethan): https://github.com/influxdata/influxdb/issues/17115
|
||||
//{s: `SELECT difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to difference`},
|
||||
//{s: `SELECT difference(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`},
|
||||
//{s: `SELECT difference(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
|
||||
//{s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
|
||||
//{s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
|
||||
//{s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`},
|
||||
{s: `SELECT non_negative_difference(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
|
||||
{s: `SELECT non_negative_difference() from myseries`, err: `invalid number of arguments for non_negative_difference, expected 1, got 0`},
|
||||
{s: `SELECT non_negative_difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to non_negative_difference`},
|
||||
|
|
|
|||
Loading…
Reference in New Issue