2018-06-08 18:07:10 +00:00
|
|
|
package influxql
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
"github.com/influxdata/flux"
|
|
|
|
"github.com/influxdata/flux/ast"
|
|
|
|
"github.com/influxdata/flux/execute"
|
2018-10-05 04:06:14 +00:00
|
|
|
"github.com/influxdata/flux/functions/transformations"
|
2018-09-06 18:09:52 +00:00
|
|
|
"github.com/influxdata/flux/semantic"
|
2018-06-08 18:07:10 +00:00
|
|
|
"github.com/influxdata/influxql"
|
|
|
|
)
|
|
|
|
|
|
|
|
// cursor is holds known information about the current stream. It maps the influxql ast information
|
|
|
|
// to the attributes on a table.
|
|
|
|
type cursor interface {
|
|
|
|
// ID contains the last id that produces this cursor.
|
2018-09-06 18:09:52 +00:00
|
|
|
ID() flux.OperationID
|
2018-06-08 18:07:10 +00:00
|
|
|
|
|
|
|
// Keys returns all of the expressions that this cursor contains.
|
|
|
|
Keys() []influxql.Expr
|
|
|
|
|
|
|
|
// Value returns the string that can be used to access the computed expression.
|
|
|
|
// If this cursor does not produce the expression, this returns false for the second
|
|
|
|
// return argument.
|
|
|
|
Value(expr influxql.Expr) (string, bool)
|
|
|
|
}
|
|
|
|
|
|
|
|
// varRefCursor contains a cursor for a single variable. This is usually the raw value
|
|
|
|
// coming from the database and points to the default value column property.
|
|
|
|
type varRefCursor struct {
|
2018-09-06 18:09:52 +00:00
|
|
|
id flux.OperationID
|
2018-06-08 18:07:10 +00:00
|
|
|
ref *influxql.VarRef
|
|
|
|
}
|
|
|
|
|
|
|
|
// createVarRefCursor creates a new cursor from a variable reference using the sources
|
|
|
|
// in the transpilerState.
|
|
|
|
func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error) {
|
|
|
|
if len(t.stmt.Sources) != 1 {
|
|
|
|
// TODO(jsternberg): Support multiple sources.
|
|
|
|
return nil, errors.New("unimplemented: only one source is allowed")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only support a direct measurement. Subqueries are not supported yet.
|
|
|
|
mm, ok := t.stmt.Sources[0].(*influxql.Measurement)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("unimplemented: source must be a measurement")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create the from spec and add it to the list of operations.
|
2018-06-11 15:13:11 +00:00
|
|
|
from, err := t.from(mm)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-06-08 18:07:10 +00:00
|
|
|
|
2018-07-25 13:34:41 +00:00
|
|
|
valuer := influxql.NowValuer{Now: t.spec.Now}
|
2018-06-08 19:18:41 +00:00
|
|
|
_, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
|
2018-06-08 18:07:10 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-06-13 15:07:06 +00:00
|
|
|
// If the maximum is not set and we have a windowing function, then
|
|
|
|
// the end time will be set to now.
|
|
|
|
if tr.Max.IsZero() {
|
|
|
|
if window, err := t.stmt.GroupByInterval(); err == nil && window > 0 {
|
2018-07-25 13:34:41 +00:00
|
|
|
tr.Max = t.spec.Now
|
2018-06-13 15:07:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-05 04:06:14 +00:00
|
|
|
range_ := t.op("range", &transformations.RangeOpSpec{
|
2018-11-05 18:49:01 +00:00
|
|
|
Start: flux.Time{Absolute: tr.MinTime()},
|
|
|
|
Stop: flux.Time{Absolute: tr.MaxTime()},
|
|
|
|
TimeColumn: execute.DefaultTimeColLabel,
|
|
|
|
StartColumn: execute.DefaultStartColLabel,
|
|
|
|
StopColumn: execute.DefaultStopColLabel,
|
2018-06-08 18:07:10 +00:00
|
|
|
}, from)
|
|
|
|
|
2018-10-05 04:06:14 +00:00
|
|
|
id := t.op("filter", &transformations.FilterOpSpec{
|
2018-06-08 18:07:10 +00:00
|
|
|
Fn: &semantic.FunctionExpression{
|
2018-10-31 18:39:36 +00:00
|
|
|
Block: &semantic.FunctionBlock{
|
|
|
|
Parameters: &semantic.FunctionParameters{
|
|
|
|
List: []*semantic.FunctionParameter{
|
|
|
|
{Key: &semantic.Identifier{Name: "r"}},
|
2018-06-08 18:07:10 +00:00
|
|
|
},
|
|
|
|
},
|
2018-10-31 18:39:36 +00:00
|
|
|
Body: &semantic.LogicalExpression{
|
|
|
|
Operator: ast.AndOperator,
|
|
|
|
Left: &semantic.BinaryExpression{
|
|
|
|
Operator: ast.EqualOperator,
|
|
|
|
Left: &semantic.MemberExpression{
|
|
|
|
Object: &semantic.IdentifierExpression{Name: "r"},
|
|
|
|
Property: "_measurement",
|
|
|
|
},
|
|
|
|
Right: &semantic.StringLiteral{Value: mm.Name},
|
|
|
|
},
|
|
|
|
Right: &semantic.BinaryExpression{
|
|
|
|
Operator: ast.EqualOperator,
|
|
|
|
Left: &semantic.MemberExpression{
|
|
|
|
Object: &semantic.IdentifierExpression{Name: "r"},
|
|
|
|
Property: "_field",
|
|
|
|
},
|
|
|
|
Right: &semantic.StringLiteral{Value: ref.Val},
|
2018-06-08 18:07:10 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}, range_)
|
|
|
|
return &varRefCursor{
|
|
|
|
id: id,
|
|
|
|
ref: ref,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
func (c *varRefCursor) ID() flux.OperationID {
|
2018-06-08 18:07:10 +00:00
|
|
|
return c.id
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *varRefCursor) Keys() []influxql.Expr {
|
|
|
|
return []influxql.Expr{c.ref}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *varRefCursor) Value(expr influxql.Expr) (string, bool) {
|
|
|
|
ref, ok := expr.(*influxql.VarRef)
|
|
|
|
if !ok {
|
|
|
|
return "", false
|
|
|
|
}
|
|
|
|
|
|
|
|
// If these are the same variable reference (by pointer), then they are equal.
|
|
|
|
if ref == c.ref || *ref == *c.ref {
|
|
|
|
return execute.DefaultValueColLabel, true
|
|
|
|
}
|
|
|
|
return "", false
|
|
|
|
}
|
2018-06-08 19:18:41 +00:00
|
|
|
|
|
|
|
// opCursor wraps a cursor with a new id while delegating all calls to the
|
|
|
|
// wrapped cursor.
|
|
|
|
type opCursor struct {
|
2018-09-06 18:09:52 +00:00
|
|
|
id flux.OperationID
|
2018-06-08 19:18:41 +00:00
|
|
|
cursor
|
|
|
|
}
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
func (c *opCursor) ID() flux.OperationID { return c.id }
|