refactor(query/influxql): follow the transpiler readme
There are a few changes to how the transpiler works. The first is that the streams are now abstracted behind a `cursor` interface. The interface keeps track of which AST nodes (like variables or function calls) are represented by the data inside of the stream and the method of how to access the underlying data. This makes it easier to make a generic interface for things like the join and map operations. This also makes it easier to, in the future, use the same code from the map operation for a filter so we can implement conditions. This also follows the transpiler readme's methods and takes advantage of the updates to the ifql language. This means it will group the relevant cursors into a cursor group, perform any necessary joins, and allow us to continue building on this as we flesh out more parts of the transpiler and the language. The cursor interface makes it so we no longer have to keep a symbol table mapping the generated names to the locations because that is all kept within the incoming cursor rather than as a separate data structure. It also splits the transpiler into more files so it is easier to find the relevant code for each stage of the transpiler.pull/10616/head
parent
8a5d649d3f
commit
1b31a1150c
|
@ -64,13 +64,7 @@ After creating the base cursors, each of them is joined into a single stream usi
|
|||
val1 = create_cursor(db: "telegraf", start: -5m, m: "cpu", f: "usage_system")
|
||||
inner_join(tables: {val1: val1, val2: val2}, except: ["_field"], fn: (tables) => {val1: tables.val1, val2: tables.val2})
|
||||
|
||||
If there is only one cursor, then we need to exclude the `_field` column from the table so the table schema matches and we can perform another join later. The `_field` property is not needed after the initial cursor creation.
|
||||
|
||||
**TODO(jsternberg):** This step can probably be done by a drop column function when/if it exists.
|
||||
|
||||
> SELECT usage_user FROM telegraf..cpu WHERE time >= now() - 5m
|
||||
val1 = create_cursor(db: "telegraf", start: -5m, m: "cpu", f: "usage_user")
|
||||
val1 |> group(except: ["_field"])
|
||||
If there is only one cursor, then nothing needs to be done.
|
||||
|
||||
### <a name="evaluate-condition"></a> Evaluate the condition
|
||||
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/ast"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
)
|
||||
|
||||
// cursor is holds known information about the current stream. It maps the influxql ast information
|
||||
// to the attributes on a table.
|
||||
type cursor interface {
|
||||
// ID contains the last id that produces this cursor.
|
||||
ID() query.OperationID
|
||||
|
||||
// Keys returns all of the expressions that this cursor contains.
|
||||
Keys() []influxql.Expr
|
||||
|
||||
// Value returns the string that can be used to access the computed expression.
|
||||
// If this cursor does not produce the expression, this returns false for the second
|
||||
// return argument.
|
||||
Value(expr influxql.Expr) (string, bool)
|
||||
}
|
||||
|
||||
// varRefCursor contains a cursor for a single variable. This is usually the raw value
|
||||
// coming from the database and points to the default value column property.
|
||||
type varRefCursor struct {
|
||||
id query.OperationID
|
||||
ref *influxql.VarRef
|
||||
}
|
||||
|
||||
// createVarRefCursor creates a new cursor from a variable reference using the sources
|
||||
// in the transpilerState.
|
||||
func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error) {
|
||||
if len(t.stmt.Sources) != 1 {
|
||||
// TODO(jsternberg): Support multiple sources.
|
||||
return nil, errors.New("unimplemented: only one source is allowed")
|
||||
}
|
||||
|
||||
// Only support a direct measurement. Subqueries are not supported yet.
|
||||
mm, ok := t.stmt.Sources[0].(*influxql.Measurement)
|
||||
if !ok {
|
||||
return nil, errors.New("unimplemented: source must be a measurement")
|
||||
}
|
||||
|
||||
// Create the from spec and add it to the list of operations.
|
||||
// TODO(jsternberg): Autogenerate these IDs and track the resulting operation
|
||||
// so we can reference them from other locations.
|
||||
from := t.op("from", &functions.FromOpSpec{
|
||||
Database: mm.Database,
|
||||
})
|
||||
|
||||
valuer := influxql.NowValuer{Now: t.now}
|
||||
cond, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if cond != nil {
|
||||
// TODO(jsternberg): Handle conditions.
|
||||
return nil, errors.New("unimplemented: conditions have not been implemented yet")
|
||||
}
|
||||
|
||||
range_ := t.op("range", &functions.RangeOpSpec{
|
||||
Start: query.Time{Absolute: tr.MinTime()},
|
||||
Stop: query.Time{Absolute: tr.MaxTime()},
|
||||
}, from)
|
||||
|
||||
id := t.op("filter", &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.AndOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: mm.Name},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_field",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: ref.Val},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, range_)
|
||||
return &varRefCursor{
|
||||
id: id,
|
||||
ref: ref,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *varRefCursor) ID() query.OperationID {
|
||||
return c.id
|
||||
}
|
||||
|
||||
func (c *varRefCursor) Keys() []influxql.Expr {
|
||||
return []influxql.Expr{c.ref}
|
||||
}
|
||||
|
||||
func (c *varRefCursor) Value(expr influxql.Expr) (string, bool) {
|
||||
ref, ok := expr.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// If these are the same variable reference (by pointer), then they are equal.
|
||||
if ref == c.ref || *ref == *c.ref {
|
||||
return execute.DefaultValueColLabel, true
|
||||
}
|
||||
return "", false
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
)
|
||||
|
||||
// createFunctionCursor creates a new cursor that calls a function on one of the columns
|
||||
// and returns the result.
|
||||
func createFunctionCursor(t *transpilerState, call *influxql.Call, in cursor) (cursor, error) {
|
||||
cur := &functionCursor{
|
||||
call: call,
|
||||
parent: in,
|
||||
}
|
||||
switch call.Name {
|
||||
case "mean":
|
||||
value, ok := in.Value(call.Args[0])
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("undefined variable: %s", call.Args[0])
|
||||
}
|
||||
cur.id = t.op("mean", &functions.MeanOpSpec{
|
||||
AggregateConfig: execute.AggregateConfig{
|
||||
Columns: []string{value},
|
||||
TimeSrc: execute.DefaultStartColLabel,
|
||||
TimeDst: execute.DefaultTimeColLabel,
|
||||
},
|
||||
}, in.ID())
|
||||
cur.value = value
|
||||
cur.exclude = map[influxql.Expr]struct{}{call.Args[0]: {}}
|
||||
case "max":
|
||||
value, ok := in.Value(call.Args[0])
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("undefined variable: %s", call.Args[0])
|
||||
}
|
||||
cur.id = t.op("max", &functions.MaxOpSpec{
|
||||
SelectorConfig: execute.SelectorConfig{
|
||||
Column: value,
|
||||
},
|
||||
}, in.ID())
|
||||
cur.value = value
|
||||
cur.exclude = map[influxql.Expr]struct{}{call.Args[0]: {}}
|
||||
default:
|
||||
return nil, errors.New("unimplemented")
|
||||
}
|
||||
return cur, nil
|
||||
}
|
||||
|
||||
type functionCursor struct {
|
||||
id query.OperationID
|
||||
call *influxql.Call
|
||||
value string
|
||||
exclude map[influxql.Expr]struct{}
|
||||
parent cursor
|
||||
}
|
||||
|
||||
func (c *functionCursor) ID() query.OperationID {
|
||||
return c.id
|
||||
}
|
||||
|
||||
func (c *functionCursor) Keys() []influxql.Expr {
|
||||
keys := []influxql.Expr{c.call}
|
||||
if a := c.parent.Keys(); len(a) > 0 {
|
||||
for _, e := range a {
|
||||
if _, ok := c.exclude[e]; ok {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, e)
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (c *functionCursor) Value(expr influxql.Expr) (string, bool) {
|
||||
if expr == c.call {
|
||||
return c.value, true
|
||||
} else if _, ok := c.exclude[expr]; ok {
|
||||
return "", false
|
||||
}
|
||||
return c.parent.Value(expr)
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
)
|
||||
|
||||
type groupInfo struct {
|
||||
call *influxql.Call
|
||||
refs []*influxql.VarRef
|
||||
}
|
||||
|
||||
type groupVisitor struct {
|
||||
groups []*groupInfo
|
||||
}
|
||||
|
||||
func (v *groupVisitor) Visit(n influxql.Node) influxql.Visitor {
|
||||
// TODO(jsternberg): Identify duplicates so they are a single common instance.
|
||||
switch expr := n.(type) {
|
||||
case *influxql.Call:
|
||||
// TODO(jsternberg): Identify math functions so we visit their arguments instead of recording them.
|
||||
// If we have a single group, it does not contain a call, and this is a selector, make this
|
||||
// the function call for the first group.
|
||||
if len(v.groups) > 0 && influxql.IsSelector(expr) && v.groups[0].call == nil {
|
||||
v.groups[0].call = expr
|
||||
} else {
|
||||
// Otherwise, we create a new group and place this expression as the call.
|
||||
v.groups = append(v.groups, &groupInfo{call: expr})
|
||||
}
|
||||
return nil
|
||||
case *influxql.VarRef:
|
||||
// If we have one group, add this as a variable reference to that group.
|
||||
// If we have zero, then create the first group. If there are multiple groups,
|
||||
// that's technically a query error, but we'll capture that somewhere else before
|
||||
// this (maybe).
|
||||
// TODO(jsternberg): Identify invalid queries where an aggregate is used with a raw value.
|
||||
if len(v.groups) == 0 {
|
||||
v.groups = append(v.groups, &groupInfo{})
|
||||
}
|
||||
v.groups[0].refs = append(v.groups[0].refs, expr)
|
||||
return nil
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// identifyGroups will identify the groups for creating data access cursors.
|
||||
func identifyGroups(stmt *influxql.SelectStatement) []*groupInfo {
|
||||
// Try to estimate the number of groups. This isn't a very important step so we
|
||||
// don't care if we are wrong. If this is a raw query, then the size is going to be 1.
|
||||
// If this is an aggregate, the size will probably be the number of fields.
|
||||
// If this is a selector, the size will be 1 again so we'll just get this wrong.
|
||||
sizeHint := 1
|
||||
if !stmt.IsRawQuery {
|
||||
sizeHint = len(stmt.Fields)
|
||||
}
|
||||
|
||||
v := &groupVisitor{
|
||||
groups: make([]*groupInfo, 0, sizeHint),
|
||||
}
|
||||
influxql.Walk(v, stmt.Fields)
|
||||
return v.groups
|
||||
}
|
||||
|
||||
func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
||||
// Create all of the cursors for every variable reference.
|
||||
var cursors []cursor
|
||||
if gr.call != nil {
|
||||
ref := gr.call.Args[0].(*influxql.VarRef)
|
||||
cur, err := createVarRefCursor(t, ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cursors = append(cursors, cur)
|
||||
}
|
||||
|
||||
for _, ref := range gr.refs {
|
||||
cur, err := createVarRefCursor(t, ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cursors = append(cursors, cur)
|
||||
}
|
||||
|
||||
// Join the cursors using an inner join.
|
||||
// TODO(jsternberg): We need to differentiate between various join types and this needs to be
|
||||
// except: ["_field"] rather than joining on the _measurement. This also needs to specify what the time
|
||||
// column should be.
|
||||
cur := Join(t, cursors, []string{"_measurement"}, nil)
|
||||
|
||||
// TODO(jsternberg): Handle conditions, function calls, multiple variable references, and basically
|
||||
// everything that needs to be done to create a cursor for a single group.
|
||||
|
||||
if c, err := gr.group(t, cur); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
cur = c
|
||||
}
|
||||
|
||||
if gr.call != nil {
|
||||
c, err := createFunctionCursor(t, gr.call, cur)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cur = c
|
||||
}
|
||||
return cur, nil
|
||||
}
|
||||
|
||||
type groupCursor struct {
|
||||
cursor
|
||||
id query.OperationID
|
||||
}
|
||||
|
||||
func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
||||
// TODO(jsternberg): Process group by clause correctly and windowing.
|
||||
id := t.op("group", &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
}, in.ID())
|
||||
return &groupCursor{id: id, cursor: in}, nil
|
||||
}
|
||||
|
||||
func (c *groupCursor) ID() query.OperationID { return c.id }
|
|
@ -0,0 +1,102 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
)
|
||||
|
||||
type joinCursor struct {
|
||||
id query.OperationID
|
||||
m map[influxql.Expr]string
|
||||
exprs []influxql.Expr
|
||||
}
|
||||
|
||||
func Join(t *transpilerState, cursors []cursor, on, except []string) cursor {
|
||||
if len(cursors) == 1 {
|
||||
return cursors[0]
|
||||
}
|
||||
|
||||
// Iterate through each cursor and each expression within each cursor to assign them an id.
|
||||
var (
|
||||
exprs []influxql.Expr
|
||||
properties []*semantic.Property
|
||||
)
|
||||
m := make(map[influxql.Expr]string)
|
||||
tables := make(map[query.OperationID]string)
|
||||
for i, cur := range cursors {
|
||||
// Record this incoming cursor within the table.
|
||||
tableName := fmt.Sprintf("t%d", i)
|
||||
tables[cur.ID()] = tableName
|
||||
|
||||
for _, k := range cur.Keys() {
|
||||
// Generate a name for accessing this expression and generate the index entries for it.
|
||||
name := fmt.Sprintf("val%d", len(exprs))
|
||||
exprs = append(exprs, k)
|
||||
m[k] = name
|
||||
|
||||
property := &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: name},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: tableName,
|
||||
},
|
||||
}
|
||||
if valName, _ := cur.Value(k); valName != execute.DefaultValueColLabel {
|
||||
property.Value = &semantic.MemberExpression{
|
||||
Object: property.Value,
|
||||
Property: valName,
|
||||
}
|
||||
}
|
||||
properties = append(properties, property)
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve the parent tables from the tables map.
|
||||
parents := make([]query.OperationID, 0, len(tables))
|
||||
for id := range tables {
|
||||
parents = append(parents, id)
|
||||
}
|
||||
id := t.op("join", &functions.JoinOpSpec{
|
||||
TableNames: tables,
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "tables"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
},
|
||||
On: on,
|
||||
// TODO(jsternberg): This option needs to be included.
|
||||
//Except: except,
|
||||
}, parents...)
|
||||
return &joinCursor{
|
||||
id: id,
|
||||
m: m,
|
||||
exprs: exprs,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *joinCursor) ID() query.OperationID {
|
||||
return c.id
|
||||
}
|
||||
|
||||
func (c *joinCursor) Keys() []influxql.Expr {
|
||||
keys := make([]influxql.Expr, 0, len(c.m))
|
||||
for k := range c.m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (c *joinCursor) Value(expr influxql.Expr) (string, bool) {
|
||||
value, ok := c.m[expr]
|
||||
return value, ok
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/ast"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
)
|
||||
|
||||
// mapCursor holds the mapping of expressions to specific fields that happens at the end of
|
||||
// the transpilation.
|
||||
// TODO(jsternberg): This abstraction might be useful for subqueries, but we only need the id
|
||||
// at the moment so just hold that.
|
||||
type mapCursor struct {
|
||||
id query.OperationID
|
||||
}
|
||||
|
||||
func (c *mapCursor) ID() query.OperationID {
|
||||
return c.id
|
||||
}
|
||||
|
||||
func (c *mapCursor) Keys() []influxql.Expr {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (c *mapCursor) Value(expr influxql.Expr) (string, bool) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// mapFields will take the list of symbols and maps each of the operations
|
||||
// using the column names.
|
||||
func (t *transpilerState) mapFields(in cursor) (cursor, error) {
|
||||
columns := t.stmt.ColumnNames()
|
||||
if len(columns) != len(t.stmt.Fields) {
|
||||
// TODO(jsternberg): This scenario should not be possible. Replace the use of ColumnNames with a more
|
||||
// statically verifiable list of columns when we process the fields from the select statement instead
|
||||
// of doing this in the future.
|
||||
panic("number of columns does not match the number of fields")
|
||||
}
|
||||
|
||||
properties := make([]*semantic.Property, 0, len(t.stmt.Fields)+2)
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: "time"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: execute.DefaultTimeColLabel,
|
||||
},
|
||||
})
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: "_measurement"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
})
|
||||
for i, f := range t.stmt.Fields {
|
||||
value, err := t.mapField(f.Expr, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: columns[i]},
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
id := t.op("map", &functions.MapOpSpec{Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
}}, in.ID())
|
||||
return &mapCursor{id: id}, nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) mapField(expr influxql.Expr, in cursor) (semantic.Expression, error) {
|
||||
if sym, ok := in.Value(expr); ok {
|
||||
return &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: sym,
|
||||
}, nil
|
||||
}
|
||||
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.Call, *influxql.VarRef:
|
||||
return nil, fmt.Errorf("missing symbol for %s", expr)
|
||||
case *influxql.BinaryExpr:
|
||||
return t.evalBinaryExpr(expr, in)
|
||||
case *influxql.ParenExpr:
|
||||
return t.mapField(expr.Expr, in)
|
||||
case *influxql.StringLiteral:
|
||||
if ts, err := expr.ToTimeLiteral(time.UTC); err == nil {
|
||||
return &semantic.DateTimeLiteral{Value: ts.Val}, nil
|
||||
}
|
||||
return &semantic.StringLiteral{Value: expr.Val}, nil
|
||||
case *influxql.NumberLiteral:
|
||||
return &semantic.FloatLiteral{Value: expr.Val}, nil
|
||||
case *influxql.IntegerLiteral:
|
||||
return &semantic.IntegerLiteral{Value: expr.Val}, nil
|
||||
case *influxql.BooleanLiteral:
|
||||
return &semantic.BooleanLiteral{Value: expr.Val}, nil
|
||||
case *influxql.DurationLiteral:
|
||||
return &semantic.DurationLiteral{Value: expr.Val}, nil
|
||||
case *influxql.TimeLiteral:
|
||||
return &semantic.DateTimeLiteral{Value: expr.Val}, nil
|
||||
default:
|
||||
// TODO(jsternberg): Handle the other expressions by turning them into
|
||||
// an equivalent expression.
|
||||
return nil, fmt.Errorf("unimplemented: %s", expr)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transpilerState) evalBinaryExpr(expr *influxql.BinaryExpr, in cursor) (semantic.Expression, error) {
|
||||
fn := func() func(left, right semantic.Expression) semantic.Expression {
|
||||
b := evalBuilder{}
|
||||
switch expr.Op {
|
||||
case influxql.EQ:
|
||||
return b.eval(ast.EqualOperator)
|
||||
case influxql.NEQ:
|
||||
return b.eval(ast.NotEqualOperator)
|
||||
case influxql.GT:
|
||||
return b.eval(ast.GreaterThanOperator)
|
||||
case influxql.GTE:
|
||||
return b.eval(ast.GreaterThanEqualOperator)
|
||||
case influxql.LT:
|
||||
return b.eval(ast.LessThanOperator)
|
||||
case influxql.LTE:
|
||||
return b.eval(ast.LessThanEqualOperator)
|
||||
case influxql.ADD:
|
||||
return b.eval(ast.AdditionOperator)
|
||||
case influxql.SUB:
|
||||
return b.eval(ast.SubtractionOperator)
|
||||
case influxql.AND:
|
||||
return b.logical(ast.AndOperator)
|
||||
case influxql.OR:
|
||||
return b.logical(ast.OrOperator)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
if fn == nil {
|
||||
return nil, fmt.Errorf("unimplemented binary expression: %s", expr)
|
||||
}
|
||||
|
||||
lhs, err := t.mapField(expr.LHS, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rhs, err := t.mapField(expr.RHS, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fn(lhs, rhs), nil
|
||||
}
|
||||
|
||||
// evalBuilder is used for namespacing the logical and eval wrapping functions.
|
||||
type evalBuilder struct{}
|
||||
|
||||
func (evalBuilder) logical(op ast.LogicalOperatorKind) func(left, right semantic.Expression) semantic.Expression {
|
||||
return func(left, right semantic.Expression) semantic.Expression {
|
||||
return &semantic.LogicalExpression{
|
||||
Operator: op,
|
||||
Left: left,
|
||||
Right: right,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (evalBuilder) eval(op ast.OperatorKind) func(left, right semantic.Expression) semantic.Expression {
|
||||
return func(left, right semantic.Expression) semantic.Expression {
|
||||
return &semantic.BinaryExpression{
|
||||
Operator: op,
|
||||
Left: left,
|
||||
Right: right,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,10 +8,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/ast"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
)
|
||||
|
||||
// Transpiler converts InfluxQL queries into a query spec.
|
||||
|
@ -44,15 +41,10 @@ func (t *Transpiler) Transpile(ctx context.Context, txt string) (*query.Spec, er
|
|||
}
|
||||
|
||||
type transpilerState struct {
|
||||
stmt *influxql.SelectStatement
|
||||
symbols []symbol
|
||||
spec *query.Spec
|
||||
nextID map[string]int
|
||||
now time.Time
|
||||
|
||||
// calls maintains an index to each function call within the select statement
|
||||
// for easy access.
|
||||
calls map[*influxql.Call]struct{}
|
||||
stmt *influxql.SelectStatement
|
||||
spec *query.Spec
|
||||
nextID map[string]int
|
||||
now time.Time
|
||||
}
|
||||
|
||||
func newTranspilerState(stmt *influxql.SelectStatement) *transpilerState {
|
||||
|
@ -68,209 +60,38 @@ func newTranspilerState(stmt *influxql.SelectStatement) *transpilerState {
|
|||
return state
|
||||
}
|
||||
|
||||
type selectInfo struct {
|
||||
names []string
|
||||
exprs map[string]influxql.Expr
|
||||
calls map[*influxql.Call]struct{}
|
||||
refs map[*influxql.VarRef]struct{}
|
||||
}
|
||||
|
||||
func newSelectInfo(n influxql.Node) *selectInfo {
|
||||
info := &selectInfo{
|
||||
exprs: make(map[string]influxql.Expr),
|
||||
calls: make(map[*influxql.Call]struct{}),
|
||||
refs: make(map[*influxql.VarRef]struct{}),
|
||||
}
|
||||
influxql.Walk(info, n)
|
||||
return info
|
||||
}
|
||||
|
||||
func (s *selectInfo) ProcessExpressions(fn func(expr influxql.Expr) error) error {
|
||||
for _, name := range s.names {
|
||||
if err := fn(s.exprs[name]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *selectInfo) Visit(n influxql.Node) influxql.Visitor {
|
||||
switch n.(type) {
|
||||
case *influxql.Call, *influxql.VarRef:
|
||||
// TODO(jsternberg): Identify if this is a math function and skip over
|
||||
// it if it is.
|
||||
sym := n.String()
|
||||
if _, ok := s.exprs[sym]; ok {
|
||||
return nil
|
||||
}
|
||||
s.names = append(s.names, sym)
|
||||
s.exprs[sym] = n.(influxql.Expr)
|
||||
|
||||
switch expr := n.(type) {
|
||||
case *influxql.Call:
|
||||
s.calls[expr] = struct{}{}
|
||||
case *influxql.VarRef:
|
||||
s.refs[expr] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (t *transpilerState) Transpile(ctx context.Context) (*query.Spec, error) {
|
||||
info := newSelectInfo(t.stmt.Fields)
|
||||
if len(info.exprs) == 0 {
|
||||
// TODO(jsternberg): Find the correct error message for this.
|
||||
// This is supposed to be handled in the compiler, but we haven't added
|
||||
// that yet.
|
||||
groups := identifyGroups(t.stmt)
|
||||
if len(groups) == 0 {
|
||||
return nil, errors.New("no fields")
|
||||
}
|
||||
t.calls = info.calls
|
||||
|
||||
// Use the list of expressions to create a specification for each of the
|
||||
// values that access data and fill in the operation id in the symbol table.
|
||||
if err := info.ProcessExpressions(func(expr influxql.Expr) (err error) {
|
||||
sym := symbol{id: expr.String()}
|
||||
sym.op, err = t.createIteratorSpec(expr)
|
||||
cursors := make([]cursor, 0, len(groups))
|
||||
for _, gr := range groups {
|
||||
cur, err := gr.createCursor(t)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
t.symbols = append(t.symbols, sym)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
cursors = append(cursors, cur)
|
||||
}
|
||||
|
||||
var (
|
||||
mapIn query.OperationID
|
||||
symbolTable map[string]string
|
||||
)
|
||||
if len(t.symbols) > 1 {
|
||||
// We will need a join table for this.
|
||||
// Join the symbols together. This returns the join's operation id
|
||||
// and the mapping of expressions to the symbol in the join table.
|
||||
mapIn, symbolTable = t.join(t.symbols)
|
||||
} else {
|
||||
sym := t.symbols[0]
|
||||
symbolTable = map[string]string{sym.id: "_value"}
|
||||
mapIn = sym.op
|
||||
}
|
||||
// Join the cursors together on the measurement name.
|
||||
// TODO(jsternberg): This needs to join on all remaining partition keys.
|
||||
cur := Join(t, cursors, []string{"_measurement"}, nil)
|
||||
|
||||
// Map each of the fields in the symbol table to their appropriate column.
|
||||
mapId, err := t.mapFields(mapIn, symbolTable)
|
||||
// Map each of the fields into another cursor. This evaluates any lingering expressions.
|
||||
cur, err := t.mapFields(cur)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Yield the statement to the name zero.
|
||||
t.op("yield", &functions.YieldOpSpec{Name: "0"}, mapId)
|
||||
// Yield the cursor from the last cursor to a stream with the name of the statement id.
|
||||
// TODO(jsternberg): Include the statement id in the transpiler state when we create
|
||||
// the state so we can yield to something other than zero.
|
||||
t.op("yield", &functions.YieldOpSpec{Name: "0"}, cur.ID())
|
||||
return t.spec, nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) createIteratorSpec(expr influxql.Expr) (query.OperationID, error) {
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.VarRef:
|
||||
return t.processVarRef(expr)
|
||||
case *influxql.Call:
|
||||
ref, err := t.processVarRef(expr.Args[0].(*influxql.VarRef))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Handle group by tags and the windowing function.
|
||||
group := t.op("group", &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
}, ref)
|
||||
|
||||
switch expr.Name {
|
||||
case "mean":
|
||||
return t.op("mean", &functions.MeanOpSpec{
|
||||
AggregateConfig: execute.AggregateConfig{
|
||||
Columns: []string{execute.DefaultValueColLabel},
|
||||
TimeSrc: execute.DefaultStartColLabel,
|
||||
TimeDst: execute.DefaultTimeColLabel,
|
||||
},
|
||||
}, group), nil
|
||||
case "max":
|
||||
//TODO add map after selector to handle src time
|
||||
//src := execute.DefaultStartColLabel
|
||||
//if len(t.calls) == 1 {
|
||||
// src = execute.DefaultTimeColLabel
|
||||
//}
|
||||
return t.op("max", &functions.MaxOpSpec{
|
||||
SelectorConfig: execute.SelectorConfig{
|
||||
Column: execute.DefaultValueColLabel,
|
||||
},
|
||||
}, group), nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("unimplemented")
|
||||
}
|
||||
|
||||
func (t *transpilerState) processVarRef(ref *influxql.VarRef) (query.OperationID, error) {
|
||||
if len(t.stmt.Sources) != 1 {
|
||||
// TODO(jsternberg): Support multiple sources.
|
||||
return "", errors.New("unimplemented: only one source is allowed")
|
||||
}
|
||||
|
||||
// Only support a direct measurement. Subqueries are not supported yet.
|
||||
mm, ok := t.stmt.Sources[0].(*influxql.Measurement)
|
||||
if !ok {
|
||||
return "", errors.New("unimplemented: source must be a measurement")
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Verify the retention policy is the default one so we avoid
|
||||
// unexpected behavior.
|
||||
|
||||
// Create the from spec and add it to the list of operations.
|
||||
// TODO(jsternberg): Autogenerate these IDs and track the resulting operation
|
||||
// so we can reference them from other locations.
|
||||
from := t.op("from", &functions.FromOpSpec{
|
||||
Database: mm.Database,
|
||||
})
|
||||
|
||||
valuer := influxql.NowValuer{Now: t.now}
|
||||
cond, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if cond != nil {
|
||||
// TODO(jsternberg): Handle conditions.
|
||||
return "", errors.New("unimplemented: conditions have not been implemented yet")
|
||||
}
|
||||
|
||||
range_ := t.op("range", &functions.RangeOpSpec{
|
||||
Start: query.Time{Absolute: tr.MinTime()},
|
||||
Stop: query.Time{Absolute: tr.MaxTime()},
|
||||
}, from)
|
||||
|
||||
return t.op("filter", &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.AndOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: mm.Name},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{Name: "r"},
|
||||
Property: "_field",
|
||||
},
|
||||
Right: &semantic.StringLiteral{Value: ref.Val},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, range_), nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) op(name string, spec query.OperationSpec, parents ...query.OperationID) query.OperationID {
|
||||
op := query.Operation{
|
||||
ID: query.OperationID(fmt.Sprintf("%s%d", name, t.nextID[name])),
|
||||
|
@ -286,208 +107,3 @@ func (t *transpilerState) op(name string, spec query.OperationSpec, parents ...q
|
|||
t.nextID[name]++
|
||||
return op.ID
|
||||
}
|
||||
|
||||
type symbol struct {
|
||||
id string
|
||||
op query.OperationID
|
||||
}
|
||||
|
||||
// join takes a mapping of expressions to operation ids and generates a join spec
|
||||
// that merges all of them into a single table with an output of a table that
|
||||
// contains mappings of each of the expressions to arbitrary symbol names.
|
||||
func (t *transpilerState) join(symbols []symbol) (query.OperationID, map[string]string) {
|
||||
op := &functions.JoinOpSpec{
|
||||
// TODO(jsternberg): This is wrong and the join needs to be done on
|
||||
// what is basically a wildcard for everything except field.
|
||||
// This also needs to be an outer join, which is not currently supported
|
||||
// and is not capable of being expressed in the spec.
|
||||
On: []string{"_measurement"},
|
||||
TableNames: make(map[query.OperationID]string),
|
||||
}
|
||||
|
||||
parents := make([]query.OperationID, 0, len(symbols))
|
||||
joinTable := make(map[string]string, len(symbols))
|
||||
properties := make([]*semantic.Property, 0, len(symbols))
|
||||
for _, sym := range symbols {
|
||||
name := fmt.Sprintf("val%d", len(joinTable))
|
||||
joinTable[sym.id] = name
|
||||
op.TableNames[sym.op] = name
|
||||
parents = append(parents, sym.op)
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: name},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: name,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Generate a function that takes the tables as an input and maps
|
||||
// each of the properties to itself.
|
||||
op.Fn = &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "tables"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
}
|
||||
return t.op("join", op, parents...), joinTable
|
||||
}
|
||||
|
||||
// mapFields will take the list of symbols and maps each of the operations
|
||||
// using the column names.
|
||||
func (t *transpilerState) mapFields(in query.OperationID, symbols map[string]string) (query.OperationID, error) {
|
||||
columns := t.stmt.ColumnNames()
|
||||
if len(columns) != len(t.stmt.Fields) {
|
||||
// TODO(jsternberg): This scenario should not be possible. Replace the use of ColumnNames with a more
|
||||
// statically verifiable list of columns when we process the fields from the select statement instead
|
||||
// of doing this in the future.
|
||||
panic("number of columns does not match the number of fields")
|
||||
}
|
||||
|
||||
properties := make([]*semantic.Property, 0, len(t.stmt.Fields)+2)
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: "time"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: execute.DefaultTimeColLabel,
|
||||
},
|
||||
})
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: "_measurement"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
})
|
||||
for i, f := range t.stmt.Fields {
|
||||
value, err := t.mapField(f.Expr, symbols)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
properties = append(properties, &semantic.Property{
|
||||
Key: &semantic.Identifier{Name: columns[i]},
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
return t.op("map", &functions.MapOpSpec{Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: properties,
|
||||
},
|
||||
}}, in), nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) mapField(expr influxql.Expr, symbols map[string]string) (semantic.Expression, error) {
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.Call, *influxql.VarRef:
|
||||
sym, ok := symbols[expr.String()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing symbol for %s", expr)
|
||||
}
|
||||
return &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: sym,
|
||||
}, nil
|
||||
case *influxql.BinaryExpr:
|
||||
return t.evalBinaryExpr(expr, symbols)
|
||||
case *influxql.ParenExpr:
|
||||
return t.mapField(expr.Expr, symbols)
|
||||
case *influxql.StringLiteral:
|
||||
if ts, err := expr.ToTimeLiteral(time.UTC); err == nil {
|
||||
return &semantic.DateTimeLiteral{Value: ts.Val}, nil
|
||||
}
|
||||
return &semantic.StringLiteral{Value: expr.Val}, nil
|
||||
case *influxql.NumberLiteral:
|
||||
return &semantic.FloatLiteral{Value: expr.Val}, nil
|
||||
case *influxql.IntegerLiteral:
|
||||
return &semantic.IntegerLiteral{Value: expr.Val}, nil
|
||||
case *influxql.BooleanLiteral:
|
||||
return &semantic.BooleanLiteral{Value: expr.Val}, nil
|
||||
case *influxql.DurationLiteral:
|
||||
return &semantic.DurationLiteral{Value: expr.Val}, nil
|
||||
case *influxql.TimeLiteral:
|
||||
return &semantic.DateTimeLiteral{Value: expr.Val}, nil
|
||||
default:
|
||||
// TODO(jsternberg): Handle the other expressions by turning them into
|
||||
// an equivalent expression.
|
||||
return nil, fmt.Errorf("unimplemented: %s", expr)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transpilerState) evalBinaryExpr(expr *influxql.BinaryExpr, symbols map[string]string) (semantic.Expression, error) {
|
||||
fn := func() func(left, right semantic.Expression) semantic.Expression {
|
||||
b := evalBuilder{}
|
||||
switch expr.Op {
|
||||
case influxql.EQ:
|
||||
return b.eval(ast.EqualOperator)
|
||||
case influxql.NEQ:
|
||||
return b.eval(ast.NotEqualOperator)
|
||||
case influxql.GT:
|
||||
return b.eval(ast.GreaterThanOperator)
|
||||
case influxql.GTE:
|
||||
return b.eval(ast.GreaterThanEqualOperator)
|
||||
case influxql.LT:
|
||||
return b.eval(ast.LessThanOperator)
|
||||
case influxql.LTE:
|
||||
return b.eval(ast.LessThanEqualOperator)
|
||||
case influxql.ADD:
|
||||
return b.eval(ast.AdditionOperator)
|
||||
case influxql.SUB:
|
||||
return b.eval(ast.SubtractionOperator)
|
||||
case influxql.AND:
|
||||
return b.logical(ast.AndOperator)
|
||||
case influxql.OR:
|
||||
return b.logical(ast.OrOperator)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
if fn == nil {
|
||||
return nil, fmt.Errorf("unimplemented binary expression: %s", expr)
|
||||
}
|
||||
|
||||
lhs, err := t.mapField(expr.LHS, symbols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rhs, err := t.mapField(expr.RHS, symbols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fn(lhs, rhs), nil
|
||||
}
|
||||
|
||||
// evalBuilder is used for namespacing the logical and eval wrapping functions.
|
||||
type evalBuilder struct{}
|
||||
|
||||
func (evalBuilder) logical(op ast.LogicalOperatorKind) func(left, right semantic.Expression) semantic.Expression {
|
||||
return func(left, right semantic.Expression) semantic.Expression {
|
||||
return &semantic.LogicalExpression{
|
||||
Operator: op,
|
||||
Left: left,
|
||||
Right: right,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (evalBuilder) eval(op ast.OperatorKind) func(left, right semantic.Expression) semantic.Expression {
|
||||
return func(left, right semantic.Expression) semantic.Expression {
|
||||
return &semantic.BinaryExpression{
|
||||
Operator: op,
|
||||
Left: left,
|
||||
Right: right,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@ import (
|
|||
"github.com/influxdata/platform/query/semantic"
|
||||
)
|
||||
|
||||
var passThroughProperties = []*semantic.Property{}
|
||||
|
||||
func TestTranspiler(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
s string
|
||||
|
@ -206,6 +204,12 @@ func TestTranspiler(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "map0",
|
||||
Spec: &functions.MapOpSpec{
|
||||
|
@ -257,7 +261,8 @@ func TestTranspiler(t *testing.T) {
|
|||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "filter0"},
|
||||
{Parent: "filter0", Child: "map0"},
|
||||
{Parent: "filter0", Child: "group0"},
|
||||
{Parent: "group0", Child: "map0"},
|
||||
{Parent: "map0", Child: "yield0"},
|
||||
},
|
||||
},
|
||||
|
@ -412,7 +417,7 @@ func TestTranspiler(t *testing.T) {
|
|||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: "val0",
|
||||
Property: "t0",
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -421,15 +426,15 @@ func TestTranspiler(t *testing.T) {
|
|||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: "val1",
|
||||
Property: "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
TableNames: map[query.OperationID]string{
|
||||
"mean0": "val0",
|
||||
"max0": "val1",
|
||||
"mean0": "t0",
|
||||
"max0": "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -626,7 +631,7 @@ func TestTranspiler(t *testing.T) {
|
|||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: "val0",
|
||||
Property: "t0",
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -635,18 +640,24 @@ func TestTranspiler(t *testing.T) {
|
|||
Object: &semantic.IdentifierExpression{
|
||||
Name: "tables",
|
||||
},
|
||||
Property: "val1",
|
||||
Property: "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
TableNames: map[query.OperationID]string{
|
||||
"filter0": "val0",
|
||||
"filter1": "val1",
|
||||
"filter0": "t0",
|
||||
"filter1": "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "map0",
|
||||
Spec: &functions.MapOpSpec{
|
||||
|
@ -711,7 +722,8 @@ func TestTranspiler(t *testing.T) {
|
|||
{Parent: "range1", Child: "filter1"},
|
||||
{Parent: "filter0", Child: "join0"},
|
||||
{Parent: "filter1", Child: "join0"},
|
||||
{Parent: "join0", Child: "map0"},
|
||||
{Parent: "join0", Child: "group0"},
|
||||
{Parent: "group0", Child: "map0"},
|
||||
{Parent: "map0", Child: "yield0"},
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue