diff --git a/query/influxql/cursor.go b/query/influxql/cursor.go index f9ed394fb8..29105e8c63 100644 --- a/query/influxql/cursor.go +++ b/query/influxql/cursor.go @@ -55,12 +55,9 @@ func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error }) valuer := influxql.NowValuer{Now: t.now} - cond, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer) + _, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer) if err != nil { return nil, err - } else if cond != nil { - // TODO(jsternberg): Handle conditions. - return nil, errors.New("unimplemented: conditions have not been implemented yet") } range_ := t.op("range", &functions.RangeOpSpec{ @@ -120,3 +117,12 @@ func (c *varRefCursor) Value(expr influxql.Expr) (string, bool) { } return "", false } + +// opCursor wraps a cursor with a new id while delegating all calls to the +// wrapped cursor. +type opCursor struct { + id query.OperationID + cursor +} + +func (c *opCursor) ID() query.OperationID { return c.id } diff --git a/query/influxql/group.go b/query/influxql/group.go index 30e98c41ea..659924a109 100644 --- a/query/influxql/group.go +++ b/query/influxql/group.go @@ -4,6 +4,8 @@ import ( "github.com/influxdata/influxql" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/functions" + "github.com/influxdata/platform/query/semantic" + "github.com/pkg/errors" ) type groupInfo struct { @@ -64,6 +66,7 @@ func identifyGroups(stmt *influxql.SelectStatement) []*groupInfo { func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) { // Create all of the cursors for every variable reference. + // TODO(jsternberg): Determine which of these cursors are from fields and which are tags. var cursors []cursor if gr.call != nil { ref := gr.call.Args[0].(*influxql.VarRef) @@ -82,21 +85,93 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) { cursors = append(cursors, cur) } + // TODO(jsternberg): Establish which variables in the condition are tags and which are fields. + // We need to create the references to fields here so they can be joined. + var ( + tags map[influxql.VarRef]struct{} + cond influxql.Expr + ) + valuer := influxql.NowValuer{Now: t.now} + if t.stmt.Condition != nil { + var err error + if cond, _, err = influxql.ConditionExpr(t.stmt.Condition, &valuer); err != nil { + return nil, err + } else if cond != nil { + tags = make(map[influxql.VarRef]struct{}) + + // Walk through the condition for every variable reference. There will be no function + // calls here. + var condErr error + influxql.WalkFunc(cond, func(node influxql.Node) { + if condErr != nil { + return + } + ref, ok := node.(*influxql.VarRef) + if !ok { + return + } + + // If the variable reference is in any of the cursors, it is definitely + // a field and we do not have to inspect it further. + for _, cur := range cursors { + if _, ok := cur.Value(ref); ok { + return + } + } + + // This may be a field or a tag. If it is a field, we need to create the cursor + // and add it to the listing of cursors so it can be joined before we evaluate the condition. + switch typ := t.mapType(ref); typ { + case influxql.Tag: + // Add this variable name to the listing of tags. + tags[*ref] = struct{}{} + default: + cur, err := createVarRefCursor(t, ref) + if err != nil { + condErr = err + return + } + cursors = append(cursors, cur) + } + }) + } + } + // Join the cursors using an inner join. // TODO(jsternberg): We need to differentiate between various join types and this needs to be // except: ["_field"] rather than joining on the _measurement. This also needs to specify what the time // column should be. cur := Join(t, cursors, []string{"_measurement"}, nil) + if len(tags) > 0 { + cur = &tagsCursor{cursor: cur, tags: tags} + } - // TODO(jsternberg): Handle conditions, function calls, multiple variable references, and basically - // everything that needs to be done to create a cursor for a single group. + // Evaluate the conditional and insert a filter if a condition exists. + if cond != nil { + // Generate a filter expression by evaluating the condition and wrapping it in a filter op. + expr, err := t.mapField(cond, cur) + if err != nil { + return nil, errors.Wrap(err, "unable to evaluate condition") + } + id := t.op("filter", &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{ + Key: &semantic.Identifier{Name: "r"}, + }}, + Body: expr, + }, + }, cur.ID()) + cur = &opCursor{id: id, cursor: cur} + } + // Group together the results. if c, err := gr.group(t, cur); err != nil { return nil, err } else { cur = c } + // If a function call is present, evaluate the function call. if gr.call != nil { c, err := createFunctionCursor(t, gr.call, cur) if err != nil { @@ -121,3 +196,22 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) { } func (c *groupCursor) ID() query.OperationID { return c.id } + +// tagsCursor is a pseudo-cursor that can be used to access tags within the cursor. +type tagsCursor struct { + cursor + tags map[influxql.VarRef]struct{} +} + +func (c *tagsCursor) Value(expr influxql.Expr) (string, bool) { + if value, ok := c.cursor.Value(expr); ok { + return value, ok + } + + if ref, ok := expr.(*influxql.VarRef); ok { + if _, ok := c.tags[*ref]; ok { + return ref.Val, true + } + } + return "", false +} diff --git a/query/influxql/transpiler.go b/query/influxql/transpiler.go index 56ebfec0fc..77d9eb8914 100644 --- a/query/influxql/transpiler.go +++ b/query/influxql/transpiler.go @@ -92,6 +92,11 @@ func (t *transpilerState) Transpile(ctx context.Context) (*query.Spec, error) { return t.spec, nil } +func (t *transpilerState) mapType(ref *influxql.VarRef) influxql.DataType { + // TODO(jsternberg): Actually evaluate the type against the schema. + return influxql.Tag +} + func (t *transpilerState) op(name string, spec query.OperationSpec, parents ...query.OperationID) query.OperationID { op := query.Operation{ ID: query.OperationID(fmt.Sprintf("%s%d", name, t.nextID[name])), diff --git a/query/influxql/transpiler_test.go b/query/influxql/transpiler_test.go index ecd8bfd27b..34fa0fef1f 100644 --- a/query/influxql/transpiler_test.go +++ b/query/influxql/transpiler_test.go @@ -728,6 +728,297 @@ func TestTranspiler(t *testing.T) { }, }, }, + { + s: `SELECT mean(value) FROM db0..cpu WHERE host = 'server01'`, + spec: &query.Spec{ + Operations: []*query.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + Database: "db0", + }, + }, + { + ID: "range0", + Spec: &functions.RangeOpSpec{ + Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)}, + Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)}, + }, + }, + { + ID: "filter0", + Spec: &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: &semantic.LogicalExpression{ + Operator: ast.AndOperator, + Left: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{ + Value: "cpu", + }, + }, + Right: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_field", + }, + Right: &semantic.StringLiteral{ + Value: "value", + }, + }, + }, + }, + }, + }, + { + ID: "filter1", + Spec: &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "host", + }, + Right: &semantic.StringLiteral{ + Value: "server01", + }, + }, + }, + }, + }, + { + ID: "group0", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, + }, + { + ID: "mean0", + Spec: &functions.MeanOpSpec{ + AggregateConfig: execute.AggregateConfig{ + TimeSrc: execute.DefaultStartColLabel, + TimeDst: execute.DefaultTimeColLabel, + Columns: []string{execute.DefaultValueColLabel}, + }, + }, + }, + { + ID: "map0", + Spec: &functions.MapOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{ + Key: &semantic.Identifier{Name: "r"}, + }}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "_measurement"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_measurement", + }, + }, + { + Key: &semantic.Identifier{Name: "mean"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + }, + }, + }, + }, + }, + }, + { + ID: "yield0", + Spec: &functions.YieldOpSpec{ + Name: "0", + }, + }, + }, + Edges: []query.Edge{ + {Parent: "from0", Child: "range0"}, + {Parent: "range0", Child: "filter0"}, + {Parent: "filter0", Child: "filter1"}, + {Parent: "filter1", Child: "group0"}, + {Parent: "group0", Child: "mean0"}, + {Parent: "mean0", Child: "map0"}, + {Parent: "map0", Child: "yield0"}, + }, + }, + }, + { + s: `SELECT value FROM db0..cpu WHERE host = 'server01'`, + spec: &query.Spec{ + Operations: []*query.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + Database: "db0", + }, + }, + { + ID: "range0", + Spec: &functions.RangeOpSpec{ + Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)}, + Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)}, + }, + }, + { + ID: "filter0", + Spec: &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: &semantic.LogicalExpression{ + Operator: ast.AndOperator, + Left: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_measurement", + }, + Right: &semantic.StringLiteral{ + Value: "cpu", + }, + }, + Right: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_field", + }, + Right: &semantic.StringLiteral{ + Value: "value", + }, + }, + }, + }, + }, + }, + { + ID: "filter1", + Spec: &functions.FilterOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{ + {Key: &semantic.Identifier{Name: "r"}}, + }, + Body: &semantic.BinaryExpression{ + Operator: ast.EqualOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "host", + }, + Right: &semantic.StringLiteral{ + Value: "server01", + }, + }, + }, + }, + }, + { + ID: "group0", + Spec: &functions.GroupOpSpec{ + By: []string{"_measurement"}, + }, + }, + { + ID: "map0", + Spec: &functions.MapOpSpec{ + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{ + Key: &semantic.Identifier{Name: "r"}, + }}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "_measurement"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_measurement", + }, + }, + { + Key: &semantic.Identifier{Name: "value"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + }, + }, + }, + }, + }, + }, + { + ID: "yield0", + Spec: &functions.YieldOpSpec{ + Name: "0", + }, + }, + }, + Edges: []query.Edge{ + {Parent: "from0", Child: "range0"}, + {Parent: "range0", Child: "filter0"}, + {Parent: "filter0", Child: "filter1"}, + {Parent: "filter1", Child: "group0"}, + {Parent: "group0", Child: "map0"}, + {Parent: "map0", Child: "yield0"}, + }, + }, + }, } { t.Run(tt.s, func(t *testing.T) { if err := tt.spec.Validate(); err != nil {