influxdb/query/select.go

1406 lines
38 KiB
Go
Raw Normal View History

package query
2015-11-04 21:06:06 +00:00
import (
"errors"
"fmt"
"math"
2015-11-04 21:06:06 +00:00
"sort"
"time"
"github.com/influxdata/influxdb/influxql"
2015-11-04 21:06:06 +00:00
)
// SelectOptions are options that customize the select call.
type SelectOptions struct {
// Authorizer is used to limit access to data
Authorizer Authorizer
// The lower bound for a select call.
MinTime time.Time
// The upper bound for a select call.
MaxTime time.Time
// Node to exclusively read from.
// If zero, all nodes are used.
NodeID uint64
// An optional channel that, if closed, signals that the select should be
// interrupted.
InterruptCh <-chan struct{}
// Maximum number of concurrent series.
MaxSeriesN int
}
2015-11-04 21:06:06 +00:00
// Select executes stmt against ic and returns a list of iterators to stream from.
//
// Statements should have all rewriting performed before calling select(). This
// includes wildcard and source expansion.
func Select(stmt *influxql.SelectStatement, ic IteratorCreator, sopt *SelectOptions) ([]Iterator, error) {
2015-11-04 21:06:06 +00:00
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, sopt)
2015-11-04 21:06:06 +00:00
if err != nil {
return nil, err
}
return buildIterators(stmt, ic, opt)
}
2015-11-04 21:06:06 +00:00
func buildIterators(stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) ([]Iterator, error) {
2015-11-04 21:06:06 +00:00
// Retrieve refs for each call and var ref.
info := newSelectInfo(stmt)
if len(info.calls) > 1 && len(info.refs) > 0 {
return nil, errors.New("cannot select fields when selecting multiple aggregates")
}
// Determine auxiliary fields to be selected.
opt.Aux = make([]influxql.VarRef, 0, len(info.refs))
2015-11-04 21:06:06 +00:00
for ref := range info.refs {
opt.Aux = append(opt.Aux, *ref)
2015-11-04 21:06:06 +00:00
}
sort.Sort(influxql.VarRefs(opt.Aux))
2015-11-04 21:06:06 +00:00
// If there are multiple auxilary fields and no calls then construct an aux iterator.
if len(info.calls) == 0 && len(info.refs) > 0 {
return buildAuxIterators(stmt.Fields, ic, stmt.Sources, opt)
2015-11-04 21:06:06 +00:00
}
// Include auxiliary fields from top() and bottom() when not writing the results.
fields := stmt.Fields
if stmt.Target == nil {
extraFields := 0
for call := range info.calls {
if call.Name == "top" || call.Name == "bottom" {
for i := 1; i < len(call.Args)-1; i++ {
ref := call.Args[i].(*influxql.VarRef)
opt.Aux = append(opt.Aux, *ref)
extraFields++
}
}
}
if extraFields > 0 {
// Rebuild the list of fields if any extra fields are being implicitly added
fields = make([]*influxql.Field, 0, len(stmt.Fields)+extraFields)
for _, f := range stmt.Fields {
fields = append(fields, f)
switch expr := f.Expr.(type) {
case *influxql.Call:
if expr.Name == "top" || expr.Name == "bottom" {
for i := 1; i < len(expr.Args)-1; i++ {
fields = append(fields, &influxql.Field{Expr: expr.Args[i]})
}
}
}
}
}
}
// Determine if there is one call and it is a selector.
selector := false
if len(info.calls) == 1 {
for call := range info.calls {
selector = influxql.IsSelector(call)
}
}
return buildFieldIterators(fields, ic, stmt.Sources, opt, selector, stmt.Target != nil)
2015-11-04 21:06:06 +00:00
}
// buildAuxIterators creates a set of iterators from a single combined auxiliary iterator.
func buildAuxIterators(fields influxql.Fields, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions) ([]Iterator, error) {
// Create the auxiliary iterators for each source.
inputs := make([]Iterator, 0, len(sources))
if err := func() error {
for _, source := range sources {
switch source := source.(type) {
case *influxql.Measurement:
input, err := ic.CreateIterator(source, opt)
if err != nil {
return err
}
inputs = append(inputs, input)
case *influxql.SubQuery:
b := subqueryBuilder{
ic: ic,
stmt: source.Statement,
}
input, err := b.buildAuxIterator(opt)
if err != nil {
return err
}
inputs = append(inputs, input)
}
}
return nil
}(); err != nil {
Iterators(inputs).Close()
return nil, err
}
// Merge iterators to read auxilary fields.
input, err := Iterators(inputs).Merge(opt)
2015-11-04 21:06:06 +00:00
if err != nil {
Iterators(inputs).Close()
2015-11-04 21:06:06 +00:00
return nil, err
} else if input == nil {
input = &nilFloatIterator{}
2015-11-04 21:06:06 +00:00
}
2016-02-05 17:23:35 +00:00
// Filter out duplicate rows, if required.
if opt.Dedupe {
// If there is no group by and it is a float iterator, see if we can use a fast dedupe.
if itr, ok := input.(FloatIterator); ok && len(opt.Dimensions) == 0 {
if sz := len(fields); sz > 0 && sz < 3 {
input = newFloatFastDedupeIterator(itr)
} else {
input = NewDedupeIterator(itr)
}
} else {
input = NewDedupeIterator(input)
}
2016-02-05 17:23:35 +00:00
}
2015-12-24 04:42:10 +00:00
// Apply limit & offset.
if opt.Limit > 0 || opt.Offset > 0 {
input = NewLimitIterator(input, opt)
}
// Wrap in an auxiliary iterator to separate the fields.
aitr := NewAuxIterator(input, opt)
2015-11-04 21:06:06 +00:00
// Generate iterators for each field.
itrs := make([]Iterator, len(fields))
if err := func() error {
for i, f := range fields {
expr := influxql.Reduce(f.Expr, nil)
itr, err := buildAuxIterator(expr, aitr, opt)
if err != nil {
return err
2015-11-04 21:06:06 +00:00
}
itrs[i] = itr
2015-11-04 21:06:06 +00:00
}
return nil
}(); err != nil {
Iterators(Iterators(itrs).filterNonNil()).Close()
aitr.Close()
return nil, err
2015-11-04 21:06:06 +00:00
}
// Background the primary iterator since there is no reader for it.
aitr.Background()
2015-11-04 21:06:06 +00:00
return itrs, nil
}
// buildAuxIterator constructs an Iterator for an expression from an AuxIterator.
func buildAuxIterator(expr influxql.Expr, aitr AuxIterator, opt IteratorOptions) (Iterator, error) {
switch expr := expr.(type) {
case *influxql.VarRef:
return aitr.Iterator(expr.Val, expr.Type), nil
case *influxql.BinaryExpr:
if rhs, ok := expr.RHS.(influxql.Literal); ok {
// The right hand side is a literal. It is more common to have the RHS be a literal,
// so we check that one first and have this be the happy path.
if lhs, ok := expr.LHS.(influxql.Literal); ok {
// We have two literals that couldn't be combined by Reduce.
return nil, fmt.Errorf("unable to construct an iterator from two literals: LHS: %T, RHS: %T", lhs, rhs)
}
lhs, err := buildAuxIterator(expr.LHS, aitr, opt)
if err != nil {
return nil, err
}
return buildRHSTransformIterator(lhs, rhs, expr.Op, opt)
} else if lhs, ok := expr.LHS.(influxql.Literal); ok {
rhs, err := buildAuxIterator(expr.RHS, aitr, opt)
if err != nil {
return nil, err
}
return buildLHSTransformIterator(lhs, rhs, expr.Op, opt)
} else {
// We have two iterators. Combine them into a single iterator.
lhs, err := buildAuxIterator(expr.LHS, aitr, opt)
if err != nil {
return nil, err
}
rhs, err := buildAuxIterator(expr.RHS, aitr, opt)
if err != nil {
return nil, err
}
return buildTransformIterator(lhs, rhs, expr.Op, opt)
}
case *influxql.ParenExpr:
return buildAuxIterator(expr.Expr, aitr, opt)
case *influxql.NilLiteral:
return &nilFloatIterator{}, nil
default:
return nil, fmt.Errorf("invalid expression type: %T", expr)
}
}
2016-01-25 17:36:04 +00:00
// buildFieldIterators creates an iterator for each field expression.
func buildFieldIterators(fields influxql.Fields, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions, selector, writeMode bool) ([]Iterator, error) {
2015-11-04 21:06:06 +00:00
// Create iterators from fields against the iterator creator.
2015-11-04 21:06:06 +00:00
itrs := make([]Iterator, len(fields))
2015-11-04 21:06:06 +00:00
if err := func() error {
2015-11-04 21:06:06 +00:00
hasAuxFields := false
2016-01-25 17:36:04 +00:00
2015-11-04 21:06:06 +00:00
var input Iterator
for i, f := range fields {
// Build iterators for calls first and save the iterator.
// We do this so we can keep the ordering provided by the user, but
// still build the Call's iterator first.
if influxql.ContainsVarRef(f.Expr) {
2015-11-04 21:06:06 +00:00
hasAuxFields = true
continue
}
expr := influxql.Reduce(f.Expr, nil)
itr, err := buildExprIterator(expr, ic, sources, opt, selector, writeMode)
2015-11-04 21:06:06 +00:00
if err != nil {
return err
} else if itr == nil {
itr = &nilFloatIterator{}
2015-11-04 21:06:06 +00:00
}
// If there is a limit or offset then apply it.
if opt.Limit > 0 || opt.Offset > 0 {
itr = NewLimitIterator(itr, opt)
}
2015-11-04 21:06:06 +00:00
itrs[i] = itr
input = itr
}
2016-01-18 22:48:49 +00:00
if input == nil || !hasAuxFields {
return nil
}
2015-11-04 21:06:06 +00:00
// Build the aux iterators. Previous validation should ensure that only one
// call was present so we build an AuxIterator from that input.
aitr := NewAuxIterator(input, opt)
2016-01-18 22:48:49 +00:00
for i, f := range fields {
if itrs[i] != nil {
itrs[i] = aitr
continue
}
2015-11-04 21:06:06 +00:00
expr := influxql.Reduce(f.Expr, nil)
itr, err := buildAuxIterator(expr, aitr, opt)
2016-01-18 22:48:49 +00:00
if err != nil {
return err
} else if itr == nil {
itr = &nilFloatIterator{}
2015-11-04 21:06:06 +00:00
}
2016-01-18 22:48:49 +00:00
itrs[i] = itr
2015-11-04 21:06:06 +00:00
}
aitr.Start()
2015-11-04 21:06:06 +00:00
return nil
}(); err != nil {
2015-11-04 21:06:06 +00:00
Iterators(Iterators(itrs).filterNonNil()).Close()
2015-11-04 21:06:06 +00:00
return nil, err
}
return itrs, nil
}
// buildExprIterator creates an iterator for an expression.
func buildExprIterator(expr influxql.Expr, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions, selector, writeMode bool) (Iterator, error) {
2015-11-04 21:06:06 +00:00
opt.Expr = expr
b := exprIteratorBuilder{
ic: ic,
sources: sources,
opt: opt,
selector: selector,
writeMode: writeMode,
}
2015-11-04 21:06:06 +00:00
switch expr := expr.(type) {
case *influxql.VarRef:
return b.buildVarRefIterator(expr)
case *influxql.Call:
return b.buildCallIterator(expr)
case *influxql.BinaryExpr:
return b.buildBinaryExprIterator(expr)
case *influxql.ParenExpr:
return buildExprIterator(expr.Expr, ic, sources, opt, selector, writeMode)
case *influxql.NilLiteral:
return &nilFloatIterator{}, nil
default:
return nil, fmt.Errorf("invalid expression type: %T", expr)
}
}
2015-11-04 21:06:06 +00:00
type exprIteratorBuilder struct {
ic IteratorCreator
sources influxql.Sources
opt IteratorOptions
selector bool
writeMode bool
}
func (b *exprIteratorBuilder) buildVarRefIterator(expr *influxql.VarRef) (Iterator, error) {
inputs := make([]Iterator, 0, len(b.sources))
if err := func() error {
for _, source := range b.sources {
switch source := source.(type) {
case *influxql.Measurement:
input, err := b.ic.CreateIterator(source, b.opt)
if err != nil {
return err
}
inputs = append(inputs, input)
case *influxql.SubQuery:
subquery := subqueryBuilder{
ic: b.ic,
stmt: source.Statement,
}
2016-05-12 21:11:19 +00:00
input, err := subquery.buildVarRefIterator(expr, b.opt)
if err != nil {
return err
}
inputs = append(inputs, input)
}
}
return nil
}(); err != nil {
Iterators(inputs).Close()
return nil, err
}
// Variable references in this section will always go into some call
// iterator. Combine it with a merge iterator.
itr := NewMergeIterator(inputs, b.opt)
if itr == nil {
itr = &nilFloatIterator{}
}
if b.opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, b.opt.InterruptCh)
}
return itr, nil
}
func (b *exprIteratorBuilder) buildCallIterator(expr *influxql.Call) (Iterator, error) {
// TODO(jsternberg): Refactor this. This section needs to die in a fire.
opt := b.opt
// Eliminate limits and offsets if they were previously set. These are handled by the caller.
opt.Limit, opt.Offset = 0, 0
switch expr.Name {
case "distinct":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
input, err = NewDistinctIterator(input, opt)
if err != nil {
return nil, err
}
return NewIntervalIterator(input, opt), nil
case "sample":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
size := expr.Args[1].(*influxql.IntegerLiteral)
return newSampleIterator(input, opt, int(size.Val))
case "holt_winters", "holt_winters_with_fit":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
h := expr.Args[1].(*influxql.IntegerLiteral)
m := expr.Args[2].(*influxql.IntegerLiteral)
includeFitData := "holt_winters_with_fit" == expr.Name
interval := opt.Interval.Duration
// Redefine interval to be unbounded to capture all aggregate results
opt.StartTime = influxql.MinTime
opt.EndTime = influxql.MaxTime
opt.Interval = Interval{}
return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval)
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "elapsed":
if !opt.Interval.IsZero() {
if opt.Ascending {
opt.StartTime -= int64(opt.Interval.Duration)
} else {
opt.EndTime += int64(opt.Interval.Duration)
}
}
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
switch expr.Name {
case "derivative", "non_negative_derivative":
interval := opt.DerivativeInterval()
isNonNegative := (expr.Name == "non_negative_derivative")
return newDerivativeIterator(input, opt, interval, isNonNegative)
case "elapsed":
interval := opt.ElapsedInterval()
return newElapsedIterator(input, opt, interval)
case "difference", "non_negative_difference":
isNonNegative := (expr.Name == "non_negative_difference")
return newDifferenceIterator(input, opt, isNonNegative)
case "moving_average":
n := expr.Args[1].(*influxql.IntegerLiteral)
if n.Val > 1 && !opt.Interval.IsZero() {
if opt.Ascending {
opt.StartTime -= int64(opt.Interval.Duration) * (n.Val - 1)
} else {
opt.EndTime += int64(opt.Interval.Duration) * (n.Val - 1)
}
}
return newMovingAverageIterator(input, int(n.Val), opt)
}
panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))
case "cumulative_sum":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
return newCumulativeSumIterator(input, opt)
case "integral":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
if err != nil {
return nil, err
}
interval := opt.IntegralInterval()
return newIntegralIterator(input, opt, interval)
case "top":
if len(expr.Args) < 2 {
return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args))
}
var input Iterator
if len(expr.Args) > 2 {
// Create a max iterator using the groupings in the arguments.
dims := make(map[string]struct{}, len(expr.Args)-2+len(opt.GroupBy))
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*influxql.VarRef)
dims[ref.Val] = struct{}{}
}
for dim := range opt.GroupBy {
dims[dim] = struct{}{}
}
call := &influxql.Call{
Name: "max",
Args: expr.Args[:1],
}
callOpt := opt
callOpt.Expr = call
callOpt.GroupBy = dims
callOpt.Fill = influxql.NoFill
builder := *b
builder.opt = callOpt
builder.selector = true
builder.writeMode = false
i, err := builder.callIterator(call, callOpt)
if err != nil {
return nil, err
}
input = i
} else {
// There are no arguments so do not organize the points by tags.
builder := *b
builder.opt.Expr = expr.Args[0]
builder.selector = true
builder.writeMode = false
ref := expr.Args[0].(*influxql.VarRef)
i, err := builder.buildVarRefIterator(ref)
if err != nil {
return nil, err
}
input = i
}
n := expr.Args[len(expr.Args)-1].(*influxql.IntegerLiteral)
return newTopIterator(input, opt, int(n.Val), b.writeMode)
case "bottom":
if len(expr.Args) < 2 {
return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args))
}
var input Iterator
if len(expr.Args) > 2 {
// Create a max iterator using the groupings in the arguments.
dims := make(map[string]struct{}, len(expr.Args)-2)
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*influxql.VarRef)
dims[ref.Val] = struct{}{}
}
for dim := range opt.GroupBy {
dims[dim] = struct{}{}
}
call := &influxql.Call{
Name: "min",
Args: expr.Args[:1],
}
callOpt := opt
callOpt.Expr = call
callOpt.GroupBy = dims
callOpt.Fill = influxql.NoFill
builder := *b
builder.opt = callOpt
builder.selector = true
builder.writeMode = false
i, err := builder.callIterator(call, callOpt)
if err != nil {
return nil, err
}
input = i
} else {
// There are no arguments so do not organize the points by tags.
builder := *b
builder.opt.Expr = expr.Args[0]
builder.selector = true
builder.writeMode = false
ref := expr.Args[0].(*influxql.VarRef)
i, err := builder.buildVarRefIterator(ref)
if err != nil {
return nil, err
}
input = i
}
n := expr.Args[len(expr.Args)-1].(*influxql.IntegerLiteral)
return newBottomIterator(input, b.opt, int(n.Val), b.writeMode)
}
itr, err := func() (Iterator, error) {
switch expr.Name {
case "count":
switch arg0 := expr.Args[0].(type) {
case *influxql.Call:
if arg0.Name == "distinct" {
input, err := buildExprIterator(arg0, b.ic, b.sources, opt, b.selector, false)
if err != nil {
return nil, err
}
return newCountIterator(input, opt)
}
}
fallthrough
case "min", "max", "sum", "first", "last", "mean":
return b.callIterator(expr, opt)
case "median":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
if err != nil {
return nil, err
}
return newMedianIterator(input, opt)
case "mode":
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
if err != nil {
return nil, err
}
return NewModeIterator(input, opt)
case "stddev":
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
if err != nil {
return nil, err
2015-11-04 21:06:06 +00:00
}
return newStddevIterator(input, opt)
case "spread":
// OPTIMIZE(benbjohnson): convert to map/reduce
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
2015-11-04 21:06:06 +00:00
if err != nil {
return nil, err
}
return newSpreadIterator(input, opt)
case "percentile":
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*influxql.VarRef), b.ic, b.sources, opt, false, false)
2015-11-04 21:06:06 +00:00
if err != nil {
return nil, err
}
var percentile float64
switch arg := expr.Args[1].(type) {
case *influxql.NumberLiteral:
percentile = arg.Val
case *influxql.IntegerLiteral:
percentile = float64(arg.Val)
}
return newPercentileIterator(input, opt, percentile)
default:
return nil, fmt.Errorf("unsupported call: %s", expr.Name)
2015-11-04 21:06:06 +00:00
}
}()
if err != nil {
return nil, err
}
if !b.selector || !opt.Interval.IsZero() {
itr = NewIntervalIterator(itr, opt)
if !opt.Interval.IsZero() && opt.Fill != influxql.NoFill {
itr = NewFillIterator(itr, expr, opt)
}
}
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
func (b *exprIteratorBuilder) buildBinaryExprIterator(expr *influxql.BinaryExpr) (Iterator, error) {
if rhs, ok := expr.RHS.(influxql.Literal); ok {
// The right hand side is a literal. It is more common to have the RHS be a literal,
// so we check that one first and have this be the happy path.
if lhs, ok := expr.LHS.(influxql.Literal); ok {
// We have two literals that couldn't be combined by Reduce.
return nil, fmt.Errorf("unable to construct an iterator from two literals: LHS: %T, RHS: %T", lhs, rhs)
}
lhs, err := buildExprIterator(expr.LHS, b.ic, b.sources, b.opt, b.selector, false)
if err != nil {
return nil, err
}
return buildRHSTransformIterator(lhs, rhs, expr.Op, b.opt)
} else if lhs, ok := expr.LHS.(influxql.Literal); ok {
rhs, err := buildExprIterator(expr.RHS, b.ic, b.sources, b.opt, b.selector, false)
if err != nil {
return nil, err
}
return buildLHSTransformIterator(lhs, rhs, expr.Op, b.opt)
} else {
// We have two iterators. Combine them into a single iterator.
lhs, err := buildExprIterator(expr.LHS, b.ic, b.sources, b.opt, false, false)
if err != nil {
return nil, err
}
rhs, err := buildExprIterator(expr.RHS, b.ic, b.sources, b.opt, false, false)
if err != nil {
return nil, err
}
return buildTransformIterator(lhs, rhs, expr.Op, b.opt)
2015-11-04 21:06:06 +00:00
}
}
func (b *exprIteratorBuilder) callIterator(expr *influxql.Call, opt IteratorOptions) (Iterator, error) {
inputs := make([]Iterator, 0, len(b.sources))
if err := func() error {
for _, source := range b.sources {
switch source := source.(type) {
case *influxql.Measurement:
input, err := b.ic.CreateIterator(source, opt)
if err != nil {
return err
}
inputs = append(inputs, input)
case *influxql.SubQuery:
// Identify the name of the field we are using.
arg0 := expr.Args[0].(*influxql.VarRef)
input, err := buildExprIterator(arg0, b.ic, []influxql.Source{source}, opt, b.selector, false)
if err != nil {
return err
}
// Wrap the result in a call iterator.
i, err := NewCallIterator(input, opt)
if err != nil {
input.Close()
return err
}
inputs = append(inputs, i)
}
}
return nil
}(); err != nil {
Iterators(inputs).Close()
return nil, err
}
itr, err := Iterators(inputs).Merge(opt)
if err != nil {
Iterators(inputs).Close()
return nil, err
} else if itr == nil {
itr = &nilFloatIterator{}
}
return itr, nil
}
func buildRHSTransformIterator(lhs Iterator, rhs influxql.Literal, op influxql.Token, opt IteratorOptions) (Iterator, error) {
fn := binaryExprFunc(iteratorDataType(lhs), literalDataType(rhs), op)
2015-11-04 21:06:06 +00:00
switch fn := fn.(type) {
case func(float64, float64) float64:
var input FloatIterator
switch lhs := lhs.(type) {
case FloatIterator:
input = lhs
case IntegerIterator:
input = &integerFloatCastIterator{input: lhs}
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs)
2015-11-04 21:06:06 +00:00
}
var val float64
switch rhs := rhs.(type) {
case *influxql.NumberLiteral:
val = rhs.Val
case *influxql.IntegerLiteral:
val = float64(rhs.Val)
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs)
2015-11-04 21:06:06 +00:00
}
return &floatTransformIterator{
input: input,
fn: func(p *FloatPoint) *FloatPoint {
if p == nil {
return nil
} else if p.Nil {
return p
2015-11-04 21:06:06 +00:00
}
p.Value = fn(p.Value, val)
2015-11-04 21:06:06 +00:00
return p
},
}, nil
case func(int64, int64) float64:
input, ok := lhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs)
}
var val int64
switch rhs := rhs.(type) {
case *influxql.IntegerLiteral:
val = rhs.Val
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerLiteral", rhs)
}
return &integerFloatTransformIterator{
input: input,
fn: func(p *IntegerPoint) *FloatPoint {
if p == nil {
return nil
}
fp := &FloatPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
fp.Nil = true
} else {
fp.Value = fn(p.Value, val)
}
return fp
},
}, nil
2015-11-04 21:06:06 +00:00
case func(float64, float64) bool:
var input FloatIterator
switch lhs := lhs.(type) {
case FloatIterator:
input = lhs
case IntegerIterator:
input = &integerFloatCastIterator{input: lhs}
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs)
2015-11-04 21:06:06 +00:00
}
var val float64
switch rhs := rhs.(type) {
case *influxql.NumberLiteral:
val = rhs.Val
case *influxql.IntegerLiteral:
val = float64(rhs.Val)
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs)
2015-11-04 21:06:06 +00:00
}
return &floatBoolTransformIterator{
input: input,
fn: func(p *FloatPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(p.Value, val)
}
return bp
},
}, nil
case func(int64, int64) int64:
var input IntegerIterator
switch lhs := lhs.(type) {
case IntegerIterator:
input = lhs
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerIterator", lhs)
}
var val int64
switch rhs := rhs.(type) {
case *influxql.IntegerLiteral:
val = rhs.Val
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerLiteral", rhs)
}
return &integerTransformIterator{
input: input,
fn: func(p *IntegerPoint) *IntegerPoint {
if p == nil {
return nil
} else if p.Nil {
return p
}
p.Value = fn(p.Value, val)
return p
},
}, nil
case func(int64, int64) bool:
var input IntegerIterator
switch lhs := lhs.(type) {
case IntegerIterator:
input = lhs
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerIterator", lhs)
}
var val int64
switch rhs := rhs.(type) {
case *influxql.IntegerLiteral:
val = rhs.Val
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerLiteral", rhs)
}
return &integerBoolTransformIterator{
input: input,
fn: func(p *IntegerPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(p.Value, val)
}
return bp
},
}, nil
case func(bool, bool) bool:
var input BooleanIterator
switch lhs := lhs.(type) {
case BooleanIterator:
input = lhs
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an BooleanIterator", lhs)
}
var val bool
switch rhs := rhs.(type) {
case *influxql.BooleanLiteral:
val = rhs.Val
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an BooleanLiteral", rhs)
}
return &booleanTransformIterator{
input: input,
fn: func(p *BooleanPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(p.Value, val)
2015-11-04 21:06:06 +00:00
}
return bp
2015-11-04 21:06:06 +00:00
},
}, nil
}
2016-01-18 22:48:49 +00:00
return nil, fmt.Errorf("unable to construct rhs transform iterator from %T and %T", lhs, rhs)
2015-11-04 21:06:06 +00:00
}
func buildLHSTransformIterator(lhs influxql.Literal, rhs Iterator, op influxql.Token, opt IteratorOptions) (Iterator, error) {
fn := binaryExprFunc(literalDataType(lhs), iteratorDataType(rhs), op)
2015-11-04 21:06:06 +00:00
switch fn := fn.(type) {
case func(float64, float64) float64:
var input FloatIterator
switch rhs := rhs.(type) {
case FloatIterator:
input = rhs
case IntegerIterator:
input = &integerFloatCastIterator{input: rhs}
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs)
2015-11-04 21:06:06 +00:00
}
var val float64
switch lhs := lhs.(type) {
case *influxql.NumberLiteral:
val = lhs.Val
case *influxql.IntegerLiteral:
val = float64(lhs.Val)
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a NumberLiteral", lhs)
2015-11-04 21:06:06 +00:00
}
return &floatTransformIterator{
input: input,
fn: func(p *FloatPoint) *FloatPoint {
if p == nil {
return nil
} else if p.Nil {
return p
2015-11-04 21:06:06 +00:00
}
p.Value = fn(val, p.Value)
2015-11-04 21:06:06 +00:00
return p
},
}, nil
case func(int64, int64) float64:
input, ok := rhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", lhs)
}
var val int64
switch lhs := lhs.(type) {
case *influxql.IntegerLiteral:
val = lhs.Val
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerLiteral", rhs)
}
return &integerFloatTransformIterator{
input: input,
fn: func(p *IntegerPoint) *FloatPoint {
if p == nil {
return nil
}
fp := &FloatPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
fp.Nil = true
} else {
fp.Value = fn(val, p.Value)
}
return fp
},
}, nil
2015-11-04 21:06:06 +00:00
case func(float64, float64) bool:
var input FloatIterator
switch rhs := rhs.(type) {
case FloatIterator:
input = rhs
case IntegerIterator:
input = &integerFloatCastIterator{input: rhs}
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs)
2015-11-04 21:06:06 +00:00
}
var val float64
switch lhs := lhs.(type) {
case *influxql.NumberLiteral:
val = lhs.Val
case *influxql.IntegerLiteral:
val = float64(lhs.Val)
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a NumberLiteral", lhs)
2015-11-04 21:06:06 +00:00
}
return &floatBoolTransformIterator{
input: input,
fn: func(p *FloatPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
2015-11-04 21:06:06 +00:00
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(val, p.Value)
}
return bp
},
}, nil
case func(int64, int64) int64:
var input IntegerIterator
switch rhs := rhs.(type) {
case IntegerIterator:
input = rhs
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerIterator", rhs)
}
var val int64
switch lhs := lhs.(type) {
case *influxql.IntegerLiteral:
val = lhs.Val
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerLiteral", lhs)
}
return &integerTransformIterator{
input: input,
fn: func(p *IntegerPoint) *IntegerPoint {
if p == nil {
return nil
} else if p.Nil {
return p
}
p.Value = fn(val, p.Value)
return p
},
}, nil
case func(int64, int64) bool:
var input IntegerIterator
switch rhs := rhs.(type) {
case IntegerIterator:
input = rhs
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerIterator", rhs)
}
var val int64
switch lhs := lhs.(type) {
case *influxql.IntegerLiteral:
val = lhs.Val
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerLiteral", lhs)
}
return &integerBoolTransformIterator{
input: input,
fn: func(p *IntegerPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(val, p.Value)
}
return bp
},
}, nil
case func(bool, bool) bool:
var input BooleanIterator
switch rhs := rhs.(type) {
case BooleanIterator:
input = rhs
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an BooleanIterator", rhs)
}
var val bool
switch lhs := lhs.(type) {
case *influxql.BooleanLiteral:
val = lhs.Val
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a BooleanLiteral", lhs)
}
return &booleanTransformIterator{
input: input,
fn: func(p *BooleanPoint) *BooleanPoint {
if p == nil {
return nil
}
bp := &BooleanPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Aux: p.Aux,
}
if p.Nil {
bp.Nil = true
} else {
bp.Value = fn(val, p.Value)
}
return bp
2015-11-04 21:06:06 +00:00
},
}, nil
}
2016-01-18 22:48:49 +00:00
return nil, fmt.Errorf("unable to construct lhs transform iterator from %T and %T", lhs, rhs)
2015-11-04 21:06:06 +00:00
}
func buildTransformIterator(lhs Iterator, rhs Iterator, op influxql.Token, opt IteratorOptions) (Iterator, error) {
fn := binaryExprFunc(iteratorDataType(lhs), iteratorDataType(rhs), op)
2015-11-04 21:06:06 +00:00
switch fn := fn.(type) {
case func(float64, float64) float64:
var left FloatIterator
switch lhs := lhs.(type) {
case FloatIterator:
left = lhs
case IntegerIterator:
left = &integerFloatCastIterator{input: lhs}
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs)
2015-11-04 21:06:06 +00:00
}
var right FloatIterator
switch rhs := rhs.(type) {
case FloatIterator:
right = rhs
case IntegerIterator:
right = &integerFloatCastIterator{input: rhs}
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs)
2015-11-04 21:06:06 +00:00
}
return newFloatExprIterator(left, right, opt, fn), nil
case func(int64, int64) float64:
left, ok := lhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs)
}
right, ok := rhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs)
}
return newIntegerFloatExprIterator(left, right, opt, fn), nil
2016-01-18 22:48:49 +00:00
case func(int64, int64) int64:
left, ok := lhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs)
2016-01-18 22:48:49 +00:00
}
right, ok := rhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs)
2016-01-18 22:48:49 +00:00
}
return newIntegerExprIterator(left, right, opt, fn), nil
2015-11-04 21:06:06 +00:00
case func(float64, float64) bool:
var left FloatIterator
switch lhs := lhs.(type) {
case FloatIterator:
left = lhs
case IntegerIterator:
left = &integerFloatCastIterator{input: lhs}
default:
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs)
2015-11-04 21:06:06 +00:00
}
var right FloatIterator
switch rhs := rhs.(type) {
case FloatIterator:
right = rhs
case IntegerIterator:
right = &integerFloatCastIterator{input: rhs}
default:
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs)
2015-11-04 21:06:06 +00:00
}
return newFloatBooleanExprIterator(left, right, opt, fn), nil
2016-01-18 22:48:49 +00:00
case func(int64, int64) bool:
left, ok := lhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs)
2016-01-18 22:48:49 +00:00
}
right, ok := rhs.(IntegerIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs)
2016-01-18 22:48:49 +00:00
}
return newIntegerBooleanExprIterator(left, right, opt, fn), nil
case func(bool, bool) bool:
left, ok := lhs.(BooleanIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a BooleanIterator", lhs)
}
right, ok := rhs.(BooleanIterator)
if !ok {
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a BooleanIterator", rhs)
}
return newBooleanExprIterator(left, right, opt, fn), nil
2015-11-04 21:06:06 +00:00
}
return nil, fmt.Errorf("unable to construct transform iterator from %T and %T", lhs, rhs)
}
func iteratorDataType(itr Iterator) influxql.DataType {
2015-11-04 21:06:06 +00:00
switch itr.(type) {
case FloatIterator:
return influxql.Float
2016-01-18 22:48:49 +00:00
case IntegerIterator:
return influxql.Integer
2015-11-04 21:06:06 +00:00
case StringIterator:
return influxql.String
2015-11-04 21:06:06 +00:00
case BooleanIterator:
return influxql.Boolean
2015-11-04 21:06:06 +00:00
default:
return influxql.Unknown
2015-11-04 21:06:06 +00:00
}
}
func literalDataType(lit influxql.Literal) influxql.DataType {
switch lit.(type) {
case *influxql.NumberLiteral:
return influxql.Float
case *influxql.IntegerLiteral:
return influxql.Integer
case *influxql.StringLiteral:
return influxql.String
case *influxql.BooleanLiteral:
return influxql.Boolean
default:
return influxql.Unknown
}
}
func binaryExprFunc(typ1 influxql.DataType, typ2 influxql.DataType, op influxql.Token) interface{} {
2016-01-18 22:48:49 +00:00
var fn interface{}
switch typ1 {
case influxql.Float:
2016-01-18 22:48:49 +00:00
fn = floatBinaryExprFunc(op)
case influxql.Integer:
switch typ2 {
case influxql.Float:
fn = floatBinaryExprFunc(op)
default:
fn = integerBinaryExprFunc(op)
}
case influxql.Boolean:
fn = booleanBinaryExprFunc(op)
2016-01-18 22:48:49 +00:00
}
return fn
}
func floatBinaryExprFunc(op influxql.Token) interface{} {
2016-01-18 22:48:49 +00:00
switch op {
case influxql.ADD:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) float64 { return lhs + rhs }
case influxql.SUB:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) float64 { return lhs - rhs }
case influxql.MUL:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) float64 { return lhs * rhs }
case influxql.DIV:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) float64 {
if rhs == 0 {
return float64(0)
}
return lhs / rhs
}
case influxql.MOD:
return func(lhs, rhs float64) float64 { return math.Mod(lhs, rhs) }
case influxql.EQ:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs == rhs }
case influxql.NEQ:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs != rhs }
case influxql.LT:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs < rhs }
case influxql.LTE:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs <= rhs }
case influxql.GT:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs > rhs }
case influxql.GTE:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs float64) bool { return lhs >= rhs }
}
return nil
}
func integerBinaryExprFunc(op influxql.Token) interface{} {
2016-01-18 22:48:49 +00:00
switch op {
case influxql.ADD:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) int64 { return lhs + rhs }
case influxql.SUB:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) int64 { return lhs - rhs }
case influxql.MUL:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) int64 { return lhs * rhs }
case influxql.DIV:
return func(lhs, rhs int64) float64 {
2016-01-18 22:48:49 +00:00
if rhs == 0 {
return float64(0)
2016-01-18 22:48:49 +00:00
}
return float64(lhs) / float64(rhs)
2015-11-04 21:06:06 +00:00
}
case influxql.MOD:
return func(lhs, rhs int64) int64 {
if rhs == 0 {
return int64(0)
}
return lhs % rhs
}
case influxql.BITWISE_AND:
return func(lhs, rhs int64) int64 { return lhs & rhs }
case influxql.BITWISE_OR:
return func(lhs, rhs int64) int64 { return lhs | rhs }
case influxql.BITWISE_XOR:
return func(lhs, rhs int64) int64 { return lhs ^ rhs }
case influxql.EQ:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs == rhs }
case influxql.NEQ:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs != rhs }
case influxql.LT:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs < rhs }
case influxql.LTE:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs <= rhs }
case influxql.GT:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs > rhs }
case influxql.GTE:
2016-01-18 22:48:49 +00:00
return func(lhs, rhs int64) bool { return lhs >= rhs }
2015-11-04 21:06:06 +00:00
}
return nil
}
func booleanBinaryExprFunc(op influxql.Token) interface{} {
switch op {
case influxql.BITWISE_AND:
return func(lhs, rhs bool) bool { return lhs && rhs }
case influxql.BITWISE_OR:
return func(lhs, rhs bool) bool { return lhs || rhs }
case influxql.BITWISE_XOR:
return func(lhs, rhs bool) bool { return lhs != rhs }
}
return nil
}