412 lines
12 KiB
Go
412 lines
12 KiB
Go
package influxql
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/execute"
|
|
"github.com/influxdata/flux/functions/transformations"
|
|
"github.com/influxdata/flux/semantic"
|
|
"github.com/influxdata/influxql"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type groupInfo struct {
|
|
call *influxql.Call
|
|
refs []*influxql.VarRef
|
|
selector bool
|
|
}
|
|
|
|
type groupVisitor struct {
|
|
calls []*function
|
|
refs []*influxql.VarRef
|
|
err error
|
|
}
|
|
|
|
func (v *groupVisitor) Visit(n influxql.Node) influxql.Visitor {
|
|
if v.err != nil {
|
|
return nil
|
|
}
|
|
|
|
// TODO(jsternberg): Identify duplicates so they are a single common instance.
|
|
switch expr := n.(type) {
|
|
case *influxql.Call:
|
|
// TODO(jsternberg): Identify math functions so we visit their arguments instead of recording them.
|
|
fn, err := parseFunction(expr)
|
|
if err != nil {
|
|
v.err = err
|
|
return nil
|
|
}
|
|
v.calls = append(v.calls, fn)
|
|
return nil
|
|
case *influxql.Distinct:
|
|
v.err = errors.New("unimplemented: distinct expression")
|
|
return nil
|
|
case *influxql.VarRef:
|
|
if expr.Val == "time" {
|
|
return nil
|
|
}
|
|
v.refs = append(v.refs, expr)
|
|
return nil
|
|
case *influxql.Wildcard:
|
|
v.err = errors.New("unimplemented: field wildcard")
|
|
return nil
|
|
case *influxql.RegexLiteral:
|
|
v.err = errors.New("unimplemented: field regex wildcard")
|
|
return nil
|
|
}
|
|
return v
|
|
}
|
|
|
|
// identifyGroups will identify the groups for creating data access cursors.
|
|
func identifyGroups(stmt *influxql.SelectStatement) ([]*groupInfo, error) {
|
|
v := &groupVisitor{}
|
|
influxql.Walk(v, stmt.Fields)
|
|
if v.err != nil {
|
|
return nil, v.err
|
|
}
|
|
|
|
// Attempt to take the calls and variables and put them into groups.
|
|
if len(v.refs) > 0 {
|
|
// If any of the calls are not selectors, we have an error message.
|
|
for _, fn := range v.calls {
|
|
if !influxql.IsSelector(fn.call) {
|
|
return nil, errors.New("mixing aggregate and non-aggregate queries is not supported")
|
|
}
|
|
}
|
|
|
|
// All of the functions are selectors. If we have more than 1, then we have another error message.
|
|
if len(v.calls) > 1 {
|
|
return nil, errors.New("mixing multiple selector functions with tags or fields is not supported")
|
|
}
|
|
|
|
// Otherwise, we create a single group.
|
|
var call *influxql.Call
|
|
if len(v.calls) == 1 {
|
|
call = v.calls[0].call
|
|
}
|
|
return []*groupInfo{{
|
|
call: call,
|
|
refs: v.refs,
|
|
selector: true, // Always a selector if we are here.
|
|
}}, nil
|
|
}
|
|
|
|
// We do not have any auxiliary fields so each of the function calls goes into
|
|
// its own group.
|
|
groups := make([]*groupInfo, 0, len(v.calls))
|
|
for _, fn := range v.calls {
|
|
groups = append(groups, &groupInfo{call: fn.call})
|
|
}
|
|
|
|
// If there is exactly one group and that contains a selector, then mark it as so.
|
|
if len(groups) == 1 && influxql.IsSelector(groups[0].call) {
|
|
groups[0].selector = true
|
|
}
|
|
return groups, nil
|
|
}
|
|
|
|
func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {
|
|
// Create all of the cursors for every variable reference.
|
|
// TODO(jsternberg): Determine which of these cursors are from fields and which are tags.
|
|
var cursors []cursor
|
|
if gr.call != nil {
|
|
ref, ok := gr.call.Args[0].(*influxql.VarRef)
|
|
if !ok {
|
|
// TODO(jsternberg): This should be validated and figured out somewhere else.
|
|
return nil, fmt.Errorf("first argument to %q must be a variable", gr.call.Name)
|
|
}
|
|
cur, err := createVarRefCursor(t, ref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cursors = append(cursors, cur)
|
|
}
|
|
|
|
for _, ref := range gr.refs {
|
|
cur, err := createVarRefCursor(t, ref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cursors = append(cursors, cur)
|
|
}
|
|
|
|
// TODO(jsternberg): Establish which variables in the condition are tags and which are fields.
|
|
// We need to create the references to fields here so they can be joined.
|
|
var (
|
|
tags map[influxql.VarRef]struct{}
|
|
cond influxql.Expr
|
|
)
|
|
valuer := influxql.NowValuer{Now: t.spec.Now}
|
|
if t.stmt.Condition != nil {
|
|
var err error
|
|
if cond, _, err = influxql.ConditionExpr(t.stmt.Condition, &valuer); err != nil {
|
|
return nil, err
|
|
} else if cond != nil {
|
|
tags = make(map[influxql.VarRef]struct{})
|
|
|
|
// Walk through the condition for every variable reference. There will be no function
|
|
// calls here.
|
|
var condErr error
|
|
influxql.WalkFunc(cond, func(node influxql.Node) {
|
|
if condErr != nil {
|
|
return
|
|
}
|
|
ref, ok := node.(*influxql.VarRef)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// If the variable reference is in any of the cursors, it is definitely
|
|
// a field and we do not have to inspect it further.
|
|
for _, cur := range cursors {
|
|
if _, ok := cur.Value(ref); ok {
|
|
return
|
|
}
|
|
}
|
|
|
|
// This may be a field or a tag. If it is a field, we need to create the cursor
|
|
// and add it to the listing of cursors so it can be joined before we evaluate the condition.
|
|
switch typ := t.mapType(ref); typ {
|
|
case influxql.Tag:
|
|
// Add this variable name to the listing of tags.
|
|
tags[*ref] = struct{}{}
|
|
default:
|
|
cur, err := createVarRefCursor(t, ref)
|
|
if err != nil {
|
|
condErr = err
|
|
return
|
|
}
|
|
cursors = append(cursors, cur)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// Join the cursors using an inner join.
|
|
// TODO(jsternberg): We need to differentiate between various join types and this needs to be
|
|
// except: ["_field"] rather than joining on the _measurement. This also needs to specify what the time
|
|
// column should be.
|
|
if len(cursors) > 1 {
|
|
return nil, errors.New("unimplemented: joining fields within a cursor")
|
|
}
|
|
|
|
cur := Join(t, cursors, []string{"_measurement"})
|
|
if len(tags) > 0 {
|
|
cur = &tagsCursor{cursor: cur, tags: tags}
|
|
}
|
|
|
|
// Evaluate the conditional and insert a filter if a condition exists.
|
|
if cond != nil {
|
|
// Generate a filter expression by evaluating the condition and wrapping it in a filter op.
|
|
expr, err := t.mapField(cond, cur)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "unable to evaluate condition")
|
|
}
|
|
id := t.op("filter", &transformations.FilterOpSpec{
|
|
Fn: &semantic.FunctionExpression{
|
|
Block: &semantic.FunctionBlock{
|
|
Parameters: &semantic.FunctionParameters{
|
|
List: []*semantic.FunctionParameter{{
|
|
Key: &semantic.Identifier{Name: "r"},
|
|
}},
|
|
},
|
|
Body: expr,
|
|
},
|
|
},
|
|
}, cur.ID())
|
|
cur = &opCursor{id: id, cursor: cur}
|
|
}
|
|
|
|
// Group together the results.
|
|
if c, err := gr.group(t, cur); err != nil {
|
|
return nil, err
|
|
} else {
|
|
cur = c
|
|
}
|
|
|
|
interval, err := t.stmt.GroupByInterval()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If a function call is present, evaluate the function call.
|
|
if gr.call != nil {
|
|
c, err := createFunctionCursor(t, gr.call, cur, !gr.selector)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cur = c
|
|
|
|
// If there was a window operation, we now need to undo that and sort by the start column
|
|
// so they stay in the same table and are joined in the correct order.
|
|
if interval > 0 {
|
|
cur = &groupCursor{
|
|
id: t.op("window", &transformations.WindowOpSpec{
|
|
Every: flux.Duration(math.MaxInt64),
|
|
Period: flux.Duration(math.MaxInt64),
|
|
TimeColumn: execute.DefaultTimeColLabel,
|
|
StartColumn: execute.DefaultStartColLabel,
|
|
StopColumn: execute.DefaultStopColLabel,
|
|
}, cur.ID()),
|
|
cursor: cur,
|
|
}
|
|
}
|
|
} else {
|
|
// If we do not have a function, but we have a field option,
|
|
// return the appropriate error message if there is something wrong with the flux.
|
|
if interval > 0 {
|
|
return nil, errors.New("using GROUP BY requires at least one aggregate function")
|
|
}
|
|
|
|
// TODO(jsternberg): Fill needs to be somewhere and it's probably here somewhere.
|
|
// Move this to the correct location once we've figured it out.
|
|
switch t.stmt.Fill {
|
|
case influxql.NoFill:
|
|
return nil, errors.New("fill(none) must be used with a function")
|
|
case influxql.LinearFill:
|
|
return nil, errors.New("fill(linear) must be used with a function")
|
|
}
|
|
}
|
|
return cur, nil
|
|
}
|
|
|
|
type groupCursor struct {
|
|
cursor
|
|
id flux.OperationID
|
|
}
|
|
|
|
func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
|
var windowEvery time.Duration
|
|
var windowStart time.Time
|
|
tags := []string{"_measurement", "_start"}
|
|
if len(t.stmt.Dimensions) > 0 {
|
|
// Maintain a set of the dimensions we have encountered.
|
|
// This is so we don't duplicate groupings, but we still maintain the
|
|
// listing of tags in the tags slice so it is deterministic.
|
|
m := make(map[string]struct{})
|
|
for _, d := range t.stmt.Dimensions {
|
|
// Reduce the expression before attempting anything. Do not evaluate the call.
|
|
expr := influxql.Reduce(d.Expr, nil)
|
|
|
|
switch expr := expr.(type) {
|
|
case *influxql.VarRef:
|
|
if strings.ToLower(expr.Val) == "time" {
|
|
return nil, errors.New("time() is a function and expects at least one argument")
|
|
} else if _, ok := m[expr.Val]; ok {
|
|
continue
|
|
}
|
|
tags = append(tags, expr.Val)
|
|
m[expr.Val] = struct{}{}
|
|
case *influxql.Call:
|
|
// Ensure the call is time() and it has one or two duration arguments.
|
|
if expr.Name != "time" {
|
|
return nil, errors.New("only time() calls allowed in dimensions")
|
|
} else if got := len(expr.Args); got < 1 || got > 2 {
|
|
return nil, errors.New("time dimension expected 1 or 2 arguments")
|
|
} else if lit, ok := expr.Args[0].(*influxql.DurationLiteral); !ok {
|
|
return nil, errors.New("time dimension must have duration argument")
|
|
} else if windowEvery != 0 {
|
|
return nil, errors.New("multiple time dimensions not allowed")
|
|
} else {
|
|
windowEvery = lit.Val
|
|
var windowOffset time.Duration
|
|
if len(expr.Args) == 2 {
|
|
switch lit2 := expr.Args[1].(type) {
|
|
case *influxql.DurationLiteral:
|
|
windowOffset = lit2.Val % windowEvery
|
|
case *influxql.TimeLiteral:
|
|
windowOffset = lit2.Val.Sub(lit2.Val.Truncate(windowEvery))
|
|
case *influxql.Call:
|
|
if lit2.Name != "now" {
|
|
return nil, errors.New("time dimension offset function must be now()")
|
|
} else if len(lit2.Args) != 0 {
|
|
return nil, errors.New("time dimension offset now() function requires no arguments")
|
|
}
|
|
now := t.spec.Now
|
|
windowOffset = now.Sub(now.Truncate(windowEvery))
|
|
|
|
// Use the evaluated offset to replace the argument. Ideally, we would
|
|
// use the interval assigned above, but the query engine hasn't been changed
|
|
// to use the compiler information yet.
|
|
expr.Args[1] = &influxql.DurationLiteral{Val: windowOffset}
|
|
case *influxql.StringLiteral:
|
|
// If literal looks like a date time then parse it as a time literal.
|
|
if lit2.IsTimeLiteral() {
|
|
t, err := lit2.ToTimeLiteral(t.stmt.Location)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
windowOffset = t.Val.Sub(t.Val.Truncate(windowEvery))
|
|
} else {
|
|
return nil, errors.New("time dimension offset must be duration or now()")
|
|
}
|
|
default:
|
|
return nil, errors.New("time dimension offset must be duration or now()")
|
|
}
|
|
|
|
//TODO set windowStart
|
|
windowStart = time.Unix(0, 0).Add(windowOffset)
|
|
}
|
|
}
|
|
case *influxql.Wildcard:
|
|
return nil, errors.New("unimplemented: dimension wildcards")
|
|
case *influxql.RegexLiteral:
|
|
return nil, errors.New("unimplemented: dimension regex wildcards")
|
|
default:
|
|
return nil, errors.New("only time and tag dimensions allowed")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Perform the grouping by the tags we found. There is always a group by because
|
|
// there is always something to group in influxql.
|
|
// TODO(jsternberg): A wildcard will skip this step.
|
|
id := t.op("group", &transformations.GroupOpSpec{
|
|
Columns: tags,
|
|
Mode: "by",
|
|
}, in.ID())
|
|
|
|
if windowEvery > 0 {
|
|
windowOp := &transformations.WindowOpSpec{
|
|
Every: flux.Duration(windowEvery),
|
|
Period: flux.Duration(windowEvery),
|
|
TimeColumn: execute.DefaultTimeColLabel,
|
|
StartColumn: execute.DefaultStartColLabel,
|
|
StopColumn: execute.DefaultStopColLabel,
|
|
}
|
|
|
|
if !windowStart.IsZero() {
|
|
windowOp.Start = flux.Time{Absolute: windowStart}
|
|
}
|
|
|
|
id = t.op("window", windowOp, id)
|
|
}
|
|
|
|
return &groupCursor{id: id, cursor: in}, nil
|
|
}
|
|
|
|
func (c *groupCursor) ID() flux.OperationID { return c.id }
|
|
|
|
// tagsCursor is a pseudo-cursor that can be used to access tags within the cursor.
|
|
type tagsCursor struct {
|
|
cursor
|
|
tags map[influxql.VarRef]struct{}
|
|
}
|
|
|
|
func (c *tagsCursor) Value(expr influxql.Expr) (string, bool) {
|
|
if value, ok := c.cursor.Value(expr); ok {
|
|
return value, ok
|
|
}
|
|
|
|
if ref, ok := expr.(*influxql.VarRef); ok {
|
|
if _, ok := c.tags[*ref]; ok {
|
|
return ref.Val, true
|
|
}
|
|
}
|
|
return "", false
|
|
}
|