Merge pull request #112 from influxdata/js-transpiler-conditions
feat(query/influxql): implement filter conditions for tags in the transpilerpull/10616/head
commit
c12e9675c0
|
@ -55,12 +55,9 @@ func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error
|
|||
})
|
||||
|
||||
valuer := influxql.NowValuer{Now: t.now}
|
||||
cond, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
||||
_, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if cond != nil {
|
||||
// TODO(jsternberg): Handle conditions.
|
||||
return nil, errors.New("unimplemented: conditions have not been implemented yet")
|
||||
}
|
||||
|
||||
range_ := t.op("range", &functions.RangeOpSpec{
|
||||
|
@ -120,3 +117,12 @@ func (c *varRefCursor) Value(expr influxql.Expr) (string, bool) {
|
|||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// opCursor wraps a cursor with a new id while delegating all calls to the
|
||||
// wrapped cursor.
|
||||
type opCursor struct {
|
||||
id query.OperationID
|
||||
cursor
|
||||
}
|
||||
|
||||
func (c *opCursor) ID() query.OperationID { return c.id }
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type groupInfo struct {
|
||||
|
@ -64,6 +66,7 @@ func identifyGroups(stmt *influxql.SelectStatement) []*groupInfo {
|
|||
|
||||
func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
||||
// Create all of the cursors for every variable reference.
|
||||
// TODO(jsternberg): Determine which of these cursors are from fields and which are tags.
|
||||
var cursors []cursor
|
||||
if gr.call != nil {
|
||||
ref := gr.call.Args[0].(*influxql.VarRef)
|
||||
|
@ -82,21 +85,93 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
|||
cursors = append(cursors, cur)
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Establish which variables in the condition are tags and which are fields.
|
||||
// We need to create the references to fields here so they can be joined.
|
||||
var (
|
||||
tags map[influxql.VarRef]struct{}
|
||||
cond influxql.Expr
|
||||
)
|
||||
valuer := influxql.NowValuer{Now: t.now}
|
||||
if t.stmt.Condition != nil {
|
||||
var err error
|
||||
if cond, _, err = influxql.ConditionExpr(t.stmt.Condition, &valuer); err != nil {
|
||||
return nil, err
|
||||
} else if cond != nil {
|
||||
tags = make(map[influxql.VarRef]struct{})
|
||||
|
||||
// Walk through the condition for every variable reference. There will be no function
|
||||
// calls here.
|
||||
var condErr error
|
||||
influxql.WalkFunc(cond, func(node influxql.Node) {
|
||||
if condErr != nil {
|
||||
return
|
||||
}
|
||||
ref, ok := node.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// If the variable reference is in any of the cursors, it is definitely
|
||||
// a field and we do not have to inspect it further.
|
||||
for _, cur := range cursors {
|
||||
if _, ok := cur.Value(ref); ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// This may be a field or a tag. If it is a field, we need to create the cursor
|
||||
// and add it to the listing of cursors so it can be joined before we evaluate the condition.
|
||||
switch typ := t.mapType(ref); typ {
|
||||
case influxql.Tag:
|
||||
// Add this variable name to the listing of tags.
|
||||
tags[*ref] = struct{}{}
|
||||
default:
|
||||
cur, err := createVarRefCursor(t, ref)
|
||||
if err != nil {
|
||||
condErr = err
|
||||
return
|
||||
}
|
||||
cursors = append(cursors, cur)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Join the cursors using an inner join.
|
||||
// TODO(jsternberg): We need to differentiate between various join types and this needs to be
|
||||
// except: ["_field"] rather than joining on the _measurement. This also needs to specify what the time
|
||||
// column should be.
|
||||
cur := Join(t, cursors, []string{"_measurement"}, nil)
|
||||
if len(tags) > 0 {
|
||||
cur = &tagsCursor{cursor: cur, tags: tags}
|
||||
}
|
||||
|
||||
// TODO(jsternberg): Handle conditions, function calls, multiple variable references, and basically
|
||||
// everything that needs to be done to create a cursor for a single group.
|
||||
// Evaluate the conditional and insert a filter if a condition exists.
|
||||
if cond != nil {
|
||||
// Generate a filter expression by evaluating the condition and wrapping it in a filter op.
|
||||
expr, err := t.mapField(cond, cur)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to evaluate condition")
|
||||
}
|
||||
id := t.op("filter", &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: expr,
|
||||
},
|
||||
}, cur.ID())
|
||||
cur = &opCursor{id: id, cursor: cur}
|
||||
}
|
||||
|
||||
// Group together the results.
|
||||
if c, err := gr.group(t, cur); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
cur = c
|
||||
}
|
||||
|
||||
// If a function call is present, evaluate the function call.
|
||||
if gr.call != nil {
|
||||
c, err := createFunctionCursor(t, gr.call, cur)
|
||||
if err != nil {
|
||||
|
@ -121,3 +196,22 @@ func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
|||
}
|
||||
|
||||
func (c *groupCursor) ID() query.OperationID { return c.id }
|
||||
|
||||
// tagsCursor is a pseudo-cursor that can be used to access tags within the cursor.
|
||||
type tagsCursor struct {
|
||||
cursor
|
||||
tags map[influxql.VarRef]struct{}
|
||||
}
|
||||
|
||||
func (c *tagsCursor) Value(expr influxql.Expr) (string, bool) {
|
||||
if value, ok := c.cursor.Value(expr); ok {
|
||||
return value, ok
|
||||
}
|
||||
|
||||
if ref, ok := expr.(*influxql.VarRef); ok {
|
||||
if _, ok := c.tags[*ref]; ok {
|
||||
return ref.Val, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
|
|
@ -92,6 +92,11 @@ func (t *transpilerState) Transpile(ctx context.Context) (*query.Spec, error) {
|
|||
return t.spec, nil
|
||||
}
|
||||
|
||||
func (t *transpilerState) mapType(ref *influxql.VarRef) influxql.DataType {
|
||||
// TODO(jsternberg): Actually evaluate the type against the schema.
|
||||
return influxql.Tag
|
||||
}
|
||||
|
||||
func (t *transpilerState) op(name string, spec query.OperationSpec, parents ...query.OperationID) query.OperationID {
|
||||
op := query.Operation{
|
||||
ID: query.OperationID(fmt.Sprintf("%s%d", name, t.nextID[name])),
|
||||
|
|
|
@ -728,6 +728,297 @@ func TestTranspiler(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
s: `SELECT mean(value) FROM db0..cpu WHERE host = 'server01'`,
|
||||
spec: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Database: "db0",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)},
|
||||
Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter0",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.AndOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "cpu",
|
||||
},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_field",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter1",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "host",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "server01",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "mean0",
|
||||
Spec: &functions.MeanOpSpec{
|
||||
AggregateConfig: execute.AggregateConfig{
|
||||
TimeSrc: execute.DefaultStartColLabel,
|
||||
TimeDst: execute.DefaultTimeColLabel,
|
||||
Columns: []string{execute.DefaultValueColLabel},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "map0",
|
||||
Spec: &functions.MapOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: []*semantic.Property{
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "time"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_time",
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "_measurement"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "mean"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "filter0"},
|
||||
{Parent: "filter0", Child: "filter1"},
|
||||
{Parent: "filter1", Child: "group0"},
|
||||
{Parent: "group0", Child: "mean0"},
|
||||
{Parent: "mean0", Child: "map0"},
|
||||
{Parent: "map0", Child: "yield0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
s: `SELECT value FROM db0..cpu WHERE host = 'server01'`,
|
||||
spec: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Database: "db0",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)},
|
||||
Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter0",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.AndOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "cpu",
|
||||
},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_field",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter1",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "host",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "server01",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "map0",
|
||||
Spec: &functions.MapOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: []*semantic.Property{
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "time"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_time",
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "_measurement"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "value"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "filter0"},
|
||||
{Parent: "filter0", Child: "filter1"},
|
||||
{Parent: "filter1", Child: "group0"},
|
||||
{Parent: "group0", Child: "map0"},
|
||||
{Parent: "map0", Child: "yield0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tt.s, func(t *testing.T) {
|
||||
if err := tt.spec.Validate(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue