influxdb/query/influxql/cursor.go

182 lines
4.5 KiB
Go

package influxql
import (
"errors"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/influxql"
)
// cursor is holds known information about the current stream. It maps the influxql ast information
// to the attributes on a table.
type cursor interface {
// Expr is the AST expression that produces this table.
Expr() ast.Expression
// Keys returns all of the expressions that this cursor contains.
Keys() []influxql.Expr
// Value returns the string that can be used to access the computed expression.
// If this cursor does not produce the expression, this returns false for the second
// return argument.
Value(expr influxql.Expr) (string, bool)
}
// varRefCursor contains a cursor for a single variable. This is usually the raw value
// coming from the database and points to the default value column property.
type varRefCursor struct {
expr ast.Expression
ref *influxql.VarRef
}
// createVarRefCursor creates a new cursor from a variable reference using the sources
// in the transpilerState.
func createVarRefCursor(t *transpilerState, ref *influxql.VarRef) (cursor, error) {
if len(t.stmt.Sources) != 1 {
// TODO(jsternberg): Support multiple sources.
return nil, errors.New("unimplemented: only one source is allowed")
}
// Only support a direct measurement. Subqueries are not supported yet.
mm, ok := t.stmt.Sources[0].(*influxql.Measurement)
if !ok {
return nil, errors.New("unimplemented: source must be a measurement")
}
// Create the from spec and add it to the list of operations.
from, err := t.from(mm)
if err != nil {
return nil, err
}
valuer := influxql.NowValuer{Now: t.config.Now}
_, tr, err := influxql.ConditionExpr(t.stmt.Condition, &valuer)
if err != nil {
return nil, err
}
// If the maximum is not set and we have a windowing function, then
// the end time will be set to now.
if tr.Max.IsZero() {
if window, err := t.stmt.GroupByInterval(); err == nil && window > 0 {
tr.Max = t.config.Now
}
}
range_ := &ast.PipeExpression{
Argument: from,
Call: &ast.CallExpression{
Callee: &ast.Identifier{
Name: "range",
},
Arguments: []ast.Expression{
&ast.ObjectExpression{
Properties: []*ast.Property{
{
Key: &ast.Identifier{
Name: "start",
},
Value: &ast.DateTimeLiteral{
Value: tr.MinTime().UTC(),
},
},
{
Key: &ast.Identifier{
Name: "stop",
},
Value: &ast.DateTimeLiteral{
Value: tr.MaxTime().UTC(),
},
},
},
},
},
},
}
expr := &ast.PipeExpression{
Argument: range_,
Call: &ast.CallExpression{
Callee: &ast.Identifier{
Name: "filter",
},
Arguments: []ast.Expression{
&ast.ObjectExpression{
Properties: []*ast.Property{
{
Key: &ast.Identifier{
Name: "fn",
},
Value: &ast.FunctionExpression{
Params: []*ast.Property{{
Key: &ast.Identifier{
Name: "r",
},
}},
Body: &ast.LogicalExpression{
Operator: ast.AndOperator,
Left: &ast.BinaryExpression{
Operator: ast.EqualOperator,
Left: &ast.MemberExpression{
Object: &ast.Identifier{Name: "r"},
Property: &ast.Identifier{Name: "_measurement"},
},
Right: &ast.StringLiteral{
Value: mm.Name,
},
},
Right: &ast.BinaryExpression{
Operator: ast.EqualOperator,
Left: &ast.MemberExpression{
Object: &ast.Identifier{Name: "r"},
Property: &ast.Identifier{Name: "_field"},
},
Right: &ast.StringLiteral{
Value: ref.Val,
},
},
},
},
},
},
},
},
},
}
return &varRefCursor{
expr: expr,
ref: ref,
}, nil
}
func (c *varRefCursor) Expr() ast.Expression {
return c.expr
}
func (c *varRefCursor) Keys() []influxql.Expr {
return []influxql.Expr{c.ref}
}
func (c *varRefCursor) Value(expr influxql.Expr) (string, bool) {
ref, ok := expr.(*influxql.VarRef)
if !ok {
return "", false
}
// If these are the same variable reference (by pointer), then they are equal.
if ref == c.ref || *ref == *c.ref {
return execute.DefaultValueColLabel, true
}
return "", false
}
// pipeCursor wraps a cursor with a new expression while delegating all calls to the
// wrapped cursor.
type pipeCursor struct {
expr ast.Expression
cursor
}
func (c *pipeCursor) Expr() ast.Expression { return c.expr }