diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index df05489e63..62eec3bdd7 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -34,7 +34,7 @@ function to allow it to be included during planning. */ // NewCallIterator returns a new iterator for a Call. -func NewCallIterator(input Iterator, opt IteratorOptions) Iterator { +func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) { name := opt.Expr.(*Call).Name switch name { case "count": @@ -52,21 +52,21 @@ func NewCallIterator(input Iterator, opt IteratorOptions) Iterator { case "mean": return newMeanIterator(input, opt) default: - panic(fmt.Sprintf("unsupported function call: %s", name)) + return nil, fmt.Errorf("unsupported function call: %s", name) } } // newCountIterator returns an iterator for operating on a count() call. -func newCountIterator(input Iterator, opt IteratorOptions) Iterator { +func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) { // FIXME: Wrap iterator in int-type iterator and always output int value. switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatCountReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatCountReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerCountReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerCountReduce}, nil default: - panic(fmt.Sprintf("unsupported count iterator type: %T", input)) + return nil, fmt.Errorf("unsupported count iterator type: %T", input) } } @@ -87,14 +87,14 @@ func integerCountReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, in } // newMinIterator returns an iterator for operating on a min() call. -func newMinIterator(input Iterator, opt IteratorOptions) Iterator { +func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMinReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMinReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMinReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMinReduce}, nil default: - panic(fmt.Sprintf("unsupported min iterator type: %T", input)) + return nil, fmt.Errorf("unsupported min iterator type: %T", input) } } @@ -115,14 +115,14 @@ func integerMinReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6 } // newMaxIterator returns an iterator for operating on a max() call. -func newMaxIterator(input Iterator, opt IteratorOptions) Iterator { +func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMaxReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMaxReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMaxReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMaxReduce}, nil default: - panic(fmt.Sprintf("unsupported max iterator type: %T", input)) + return nil, fmt.Errorf("unsupported max iterator type: %T", input) } } @@ -143,14 +143,14 @@ func integerMaxReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6 } // newSumIterator returns an iterator for operating on a sum() call. -func newSumIterator(input Iterator, opt IteratorOptions) Iterator { +func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSumReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSumReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSumReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSumReduce}, nil default: - panic(fmt.Sprintf("unsupported sum iterator type: %T", input)) + return nil, fmt.Errorf("unsupported sum iterator type: %T", input) } } @@ -171,14 +171,14 @@ func integerSumReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6 } // newFirstIterator returns an iterator for operating on a first() call. -func newFirstIterator(input Iterator, opt IteratorOptions) Iterator { +func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatFirstReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatFirstReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerFirstReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerFirstReduce}, nil default: - panic(fmt.Sprintf("unsupported first iterator type: %T", input)) + return nil, fmt.Errorf("unsupported first iterator type: %T", input) } } @@ -199,14 +199,14 @@ func integerFirstReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, in } // newLastIterator returns an iterator for operating on a last() call. -func newLastIterator(input Iterator, opt IteratorOptions) Iterator { +func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatLastReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatLastReduce}, nil case IntegerIterator: - return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerLastReduce} + return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerLastReduce}, nil default: - panic(fmt.Sprintf("unsupported last iterator type: %T", input)) + return nil, fmt.Errorf("unsupported last iterator type: %T", input) } } @@ -227,16 +227,16 @@ func integerLastReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int } // NewDistinctIterator returns an iterator for operating on a distinct() call. -func NewDistinctIterator(input Iterator, opt IteratorOptions) Iterator { +func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatDistinctReduceSlice} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatDistinctReduceSlice}, nil case IntegerIterator: - return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerDistinctReduceSlice} + return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerDistinctReduceSlice}, nil case StringIterator: - return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringDistinctReduceSlice} + return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringDistinctReduceSlice}, nil default: - panic(fmt.Sprintf("unsupported distinct iterator type: %T", input)) + return nil, fmt.Errorf("unsupported distinct iterator type: %T", input) } } @@ -292,14 +292,14 @@ func stringDistinctReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoin } // newMeanIterator returns an iterator for operating on a mean() call. -func newMeanIterator(input Iterator, opt IteratorOptions) Iterator { +func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMeanReduce} + return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMeanReduce}, nil case IntegerIterator: - return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMeanReduce} + return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMeanReduce}, nil default: - panic(fmt.Sprintf("unsupported mean iterator type: %T", input)) + return nil, fmt.Errorf("unsupported mean iterator type: %T", input) } } @@ -338,14 +338,14 @@ func integerMeanReduce(prev *FloatPoint, curr *IntegerPoint, opt *reduceOptions) } // newMedianIterator returns an iterator for operating on a median() call. -func newMedianIterator(input Iterator, opt IteratorOptions) Iterator { +func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMedianReduceSlice} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMedianReduceSlice}, nil case IntegerIterator: - return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMedianReduceSlice} + return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMedianReduceSlice}, nil default: - panic(fmt.Sprintf("unsupported median iterator type: %T", input)) + return nil, fmt.Errorf("unsupported median iterator type: %T", input) } } @@ -386,16 +386,16 @@ func integerMedianReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint } // newStddevIterator returns an iterator for operating on a stddev() call. -func newStddevIterator(input Iterator, opt IteratorOptions) Iterator { +func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatStddevReduceSlice} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatStddevReduceSlice}, nil case IntegerIterator: - return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerStddevReduceSlice} + return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerStddevReduceSlice}, nil case StringIterator: - return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringStddevReduceSlice} + return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringStddevReduceSlice}, nil default: - panic(fmt.Sprintf("unsupported stddev iterator type: %T", input)) + return nil, fmt.Errorf("unsupported stddev iterator type: %T", input) } } @@ -463,14 +463,14 @@ func stringStddevReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoint } // newSpreadIterator returns an iterator for operating on a spread() call. -func newSpreadIterator(input Iterator, opt IteratorOptions) Iterator { +func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSpreadReduceSlice} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSpreadReduceSlice}, nil case IntegerIterator: - return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSpreadReduceSlice} + return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSpreadReduceSlice}, nil default: - panic(fmt.Sprintf("unsupported spread iterator type: %T", input)) + return nil, fmt.Errorf("unsupported spread iterator type: %T", input) } } @@ -501,14 +501,14 @@ func integerSpreadReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerPoi } // newTopIterator returns an iterator for operating on a top() call. -func newTopIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) Iterator { +func newTopIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil case IntegerIterator: - return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)} + return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil default: - panic(fmt.Sprintf("unsupported top iterator type: %T", input)) + return nil, fmt.Errorf("unsupported top iterator type: %T", input) } } @@ -605,14 +605,14 @@ func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) integerR } // newBottomIterator returns an iterator for operating on a bottom() call. -func newBottomIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) Iterator { +func newBottomIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil case IntegerIterator: - return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)} + return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil default: - panic(fmt.Sprintf("unsupported bottom iterator type: %T", input)) + return nil, fmt.Errorf("unsupported bottom iterator type: %T", input) } } @@ -769,14 +769,14 @@ func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p } // newPercentileIterator returns an iterator for operating on a percentile() call. -func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) Iterator { +func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatPercentileReduceSliceFunc(percentile)} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatPercentileReduceSliceFunc(percentile)}, nil case IntegerIterator: - return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerPercentileReduceSliceFunc(percentile)} + return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerPercentileReduceSliceFunc(percentile)}, nil default: - panic(fmt.Sprintf("unsupported percentile iterator type: %T", input)) + return nil, fmt.Errorf("unsupported percentile iterator type: %T", input) } } @@ -811,14 +811,14 @@ func newIntegerPercentileReduceSliceFunc(percentile float64) integerReduceSliceF } // newDerivativeIterator returns an iterator for operating on a derivative() call. -func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) Iterator { +func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) { switch input := input.(type) { case FloatIterator: - return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)} + return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)}, nil case IntegerIterator: - return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)} + return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)}, nil default: - panic(fmt.Sprintf("unsupported derivative iterator type: %T", input)) + return nil, fmt.Errorf("unsupported derivative iterator type: %T", input) } } diff --git a/influxql/call_iterator_test.go b/influxql/call_iterator_test.go index 492faf62b8..cb63a487ad 100644 --- a/influxql/call_iterator_test.go +++ b/influxql/call_iterator_test.go @@ -12,7 +12,7 @@ import ( // Ensure that a float iterator can be created for a count() call. func TestCallIterator_Count_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -44,7 +44,7 @@ func TestCallIterator_Count_Float(t *testing.T) { // Ensure that an integer iterator can be created for a count() call. func TestCallIterator_Count_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -76,7 +76,7 @@ func TestCallIterator_Count_Integer(t *testing.T) { // Ensure that a float iterator can be created for a min() call. func TestCallIterator_Min_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -107,7 +107,7 @@ func TestCallIterator_Min_Float(t *testing.T) { // Ensure that a integer iterator can be created for a min() call. func TestCallIterator_Min_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -138,7 +138,7 @@ func TestCallIterator_Min_Integer(t *testing.T) { // Ensure that a float iterator can be created for a max() call. func TestCallIterator_Max_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -168,7 +168,7 @@ func TestCallIterator_Max_Float(t *testing.T) { // Ensure that a integer iterator can be created for a max() call. func TestCallIterator_Max_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -198,7 +198,7 @@ func TestCallIterator_Max_Integer(t *testing.T) { // Ensure that a float iterator can be created for a sum() call. func TestCallIterator_Sum_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -228,7 +228,7 @@ func TestCallIterator_Sum_Float(t *testing.T) { // Ensure that an integer iterator can be created for a sum() call. func TestCallIterator_Sum_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, @@ -258,7 +258,7 @@ func TestCallIterator_Sum_Integer(t *testing.T) { // Ensure that a float iterator can be created for a first() call. func TestCallIterator_First_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, @@ -288,7 +288,7 @@ func TestCallIterator_First_Float(t *testing.T) { // Ensure that an integer iterator can be created for a first() call. func TestCallIterator_First_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, @@ -318,7 +318,7 @@ func TestCallIterator_First_Integer(t *testing.T) { // Ensure that a float iterator can be created for a last() call. func TestCallIterator_Last_Float(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &FloatIterator{Points: []influxql.FloatPoint{ {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, @@ -348,7 +348,7 @@ func TestCallIterator_Last_Float(t *testing.T) { // Ensure that an integer iterator can be created for a last() call. func TestCallIterator_Last_Integer(t *testing.T) { - itr := influxql.NewCallIterator( + itr, _ := influxql.NewCallIterator( &IntegerIterator{Points: []influxql.IntegerPoint{ {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, @@ -376,18 +376,40 @@ func TestCallIterator_Last_Integer(t *testing.T) { } } +func TestNewCallIterator_UnsupportedExprName(t *testing.T) { + _, err := influxql.NewCallIterator( + &FloatIterator{}, + influxql.IteratorOptions{ + Expr: MustParseExpr(`foobar("value")`), + }, + ) + + if err == nil || err.Error() != "unsupported function call: foobar" { + t.Errorf("unexpected error: %s", err) + } +} + func BenchmarkCallIterator_Min_Float(b *testing.B) { input := GenerateFloatIterator(rand.New(rand.NewSource(0)), b.N) b.ResetTimer() b.ReportAllocs() - itr := influxql.NewCallIterator(input, influxql.IteratorOptions{ + itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{ Expr: MustParseExpr("min(value)"), Interval: influxql.Interval{Duration: 1 * time.Hour}, - }).(influxql.FloatIterator) - for { - if p := itr.Next(); p == nil { - break + }) + if err != nil { + b.Fatal(err) + } + + switch itr := itr.(type) { + case influxql.FloatIterator: + for { + if p := itr.Next(); p == nil { + break + } } + default: + b.Fatalf("incorrect iterator type: %T", itr) } } diff --git a/influxql/select.go b/influxql/select.go index fa419aee42..e0d916a267 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -209,104 +209,13 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter case *Call: // FIXME(benbjohnson): Validate that only calls with 1 arg are passed to IC. - var err error - var itr Iterator switch expr.Name { - case "count": - switch arg := expr.Args[0].(type) { - case *Call: - if arg.Name == "distinct" { - input, err := buildExprIterator(arg, ic, opt) - if err != nil { - return nil, err - } - itr = newCountIterator(input, opt) - } - default: - itr, err = ic.CreateIterator(opt) - } - case "min", "max", "sum", "first", "last", "mean": - itr, err = ic.CreateIterator(opt) case "distinct": input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) if err != nil { return nil, err } - return NewDistinctIterator(input, opt), nil - case "median": - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - itr = newMedianIterator(input, opt) - case "stddev": - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - itr = newStddevIterator(input, opt) - case "spread": - // OPTIMIZE(benbjohnson): convert to map/reduce - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - itr = newSpreadIterator(input, opt) - case "top": - var tags []int - if len(expr.Args) < 2 { - return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args)) - } else if len(expr.Args) > 2 { - // We need to find the indices of where the tag values are stored in Aux - // This section is O(n^2), but for what should be a low value. - for i := 1; i < len(expr.Args)-1; i++ { - ref := expr.Args[i].(*VarRef) - for index, name := range opt.Aux { - if name == ref.Val { - tags = append(tags, index) - break - } - } - } - } - - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - n := expr.Args[len(expr.Args)-1].(*NumberLiteral) - itr = newTopIterator(input, opt, n, tags) - case "bottom": - var tags []int - if len(expr.Args) < 2 { - return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args)) - } else if len(expr.Args) > 2 { - // We need to find the indices of where the tag values are stored in Aux - // This section is O(n^2), but for what should be a low value. - for i := 1; i < len(expr.Args)-1; i++ { - ref := expr.Args[i].(*VarRef) - for index, name := range opt.Aux { - if name == ref.Val { - tags = append(tags, index) - break - } - } - } - } - - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - n := expr.Args[len(expr.Args)-1].(*NumberLiteral) - itr = newBottomIterator(input, opt, n, tags) - case "percentile": - input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) - if err != nil { - return nil, err - } - percentile := expr.Args[1].(*NumberLiteral).Val - itr = newPercentileIterator(input, opt, percentile) + return NewDistinctIterator(input, opt) case "derivative", "non_negative_derivative": input, err := buildExprIterator(expr.Args[0], ic, opt) if err != nil { @@ -319,19 +228,112 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter // Derivatives do not use GROUP BY intervals or time constraints, so clear these options. opt.Interval = Interval{} opt.StartTime, opt.EndTime = MinTime, MaxTime - return newDerivativeIterator(input, opt, interval, isNonNegative), nil + return newDerivativeIterator(input, opt, interval, isNonNegative) default: - return nil, fmt.Errorf("unsupported call: %s", expr.Name) - } + itr, err := func() (Iterator, error) { + switch expr.Name { + case "count": + switch arg := expr.Args[0].(type) { + case *Call: + if arg.Name == "distinct" { + input, err := buildExprIterator(arg, ic, opt) + if err != nil { + return nil, err + } + return newCountIterator(input, opt) + } + } + return ic.CreateIterator(opt) + case "min", "max", "sum", "first", "last", "mean": + return ic.CreateIterator(opt) + case "median": + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + return newMedianIterator(input, opt) + case "stddev": + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + return newStddevIterator(input, opt) + case "spread": + // OPTIMIZE(benbjohnson): convert to map/reduce + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + return newSpreadIterator(input, opt) + case "top": + var tags []int + if len(expr.Args) < 2 { + return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args)) + } else if len(expr.Args) > 2 { + // We need to find the indices of where the tag values are stored in Aux + // This section is O(n^2), but for what should be a low value. + for i := 1; i < len(expr.Args)-1; i++ { + ref := expr.Args[i].(*VarRef) + for index, name := range opt.Aux { + if name == ref.Val { + tags = append(tags, index) + break + } + } + } + } - if err != nil { - return nil, err - } + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + n := expr.Args[len(expr.Args)-1].(*NumberLiteral) + return newTopIterator(input, opt, n, tags) + case "bottom": + var tags []int + if len(expr.Args) < 2 { + return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args)) + } else if len(expr.Args) > 2 { + // We need to find the indices of where the tag values are stored in Aux + // This section is O(n^2), but for what should be a low value. + for i := 1; i < len(expr.Args)-1; i++ { + ref := expr.Args[i].(*VarRef) + for index, name := range opt.Aux { + if name == ref.Val { + tags = append(tags, index) + break + } + } + } + } - if !opt.Interval.IsZero() && opt.Fill != NoFill { - itr = NewFillIterator(itr, expr, opt) + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + n := expr.Args[len(expr.Args)-1].(*NumberLiteral) + return newBottomIterator(input, opt, n, tags) + case "percentile": + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt) + if err != nil { + return nil, err + } + percentile := expr.Args[1].(*NumberLiteral).Val + return newPercentileIterator(input, opt, percentile) + default: + return nil, fmt.Errorf("unsupported call: %s", expr.Name) + } + }() + + if err != nil { + return nil, err + } + + if !opt.Interval.IsZero() && opt.Fill != NoFill { + itr = NewFillIterator(itr, expr, opt) + } + return itr, nil } - return itr, nil case *BinaryExpr: if rhs, ok := expr.RHS.(Literal); ok { // The right hand side is a literal. It is more common to have the RHS be a literal, diff --git a/influxql/select_test.go b/influxql/select_test.go index 77dbe38c87..cf7772486e 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -28,7 +28,7 @@ func TestSelect_Min(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -132,6 +132,24 @@ func TestSelect_Distinct_String(t *testing.T) { } } +// Ensure a SELECT distinct() query cannot be executed on booleans. +func TestSelect_Distinct_Boolean(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &BooleanIterator{}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err == nil || err.Error() != "unsupported distinct iterator type: *influxql_test.BooleanIterator" { + t.Errorf("unexpected error: %s", err) + } + + if itrs != nil { + influxql.Iterators(itrs).Close() + } +} + // Ensure a SELECT mean() query can be executed. func TestSelect_Mean_Float(t *testing.T) { var ic IteratorCreator @@ -149,7 +167,7 @@ func TestSelect_Mean_Float(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 4}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -184,7 +202,7 @@ func TestSelect_Mean_Integer(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 4}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -202,6 +220,42 @@ func TestSelect_Mean_Integer(t *testing.T) { } } +// Ensure a SELECT mean() query cannot be executed on strings. +func TestSelect_Mean_String(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return influxql.NewCallIterator(&StringIterator{}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err == nil || err.Error() != "unsupported mean iterator type: *influxql_test.StringIterator" { + t.Errorf("unexpected error: %s", err) + } + + if itrs != nil { + influxql.Iterators(itrs).Close() + } +} + +// Ensure a SELECT mean() query cannot be executed on booleans. +func TestSelect_Mean_Boolean(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return influxql.NewCallIterator(&BooleanIterator{}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err == nil || err.Error() != "unsupported mean iterator type: *influxql_test.BooleanIterator" { + t.Errorf("unexpected error: %s", err) + } + + if itrs != nil { + influxql.Iterators(itrs).Close() + } +} + // Ensure a SELECT median() query can be executed. func TestSelect_Median_Float(t *testing.T) { var ic IteratorCreator @@ -272,6 +326,42 @@ func TestSelect_Median_Integer(t *testing.T) { } } +// Ensure a SELECT median() query cannot be executed on strings. +func TestSelect_Median_String(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &StringIterator{}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err == nil || err.Error() != "unsupported median iterator type: *influxql_test.StringIterator" { + t.Errorf("unexpected error: %s", err) + } + + if itrs != nil { + influxql.Iterators(itrs).Close() + } +} + +// Ensure a SELECT median() query cannot be executed on booleans. +func TestSelect_Median_Boolean(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &BooleanIterator{}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err == nil || err.Error() != "unsupported median iterator type: *influxql_test.BooleanIterator" { + t.Errorf("unexpected error: %s", err) + } + + if itrs != nil { + influxql.Iterators(itrs).Close() + } +} + // Ensure a SELECT top() query can be executed. func TestSelect_Top_NoTags_Float(t *testing.T) { var ic IteratorCreator @@ -774,7 +864,7 @@ func TestSelect_Fill_Null_Float(t *testing.T) { ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -799,7 +889,7 @@ func TestSelect_Fill_Number_Float(t *testing.T) { ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -824,7 +914,7 @@ func TestSelect_Fill_Previous_Float(t *testing.T) { ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, - }}, opt), nil + }}, opt) } // Execute selection. @@ -1495,7 +1585,7 @@ func TestSelect_ParenExpr(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, - }}, opt), nil + }}, opt) } // Execute selection. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b01e51c361..3bb3f9ada5 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -660,7 +660,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator if err != nil { return nil, err } - return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt), nil + return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt) } itrs, err := e.createVarRefIterator(opt) diff --git a/tsdb/shard.go b/tsdb/shard.go index 77657a1b55..941c2dd741 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -463,7 +463,7 @@ func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, } } } - return influxql.NewCallIterator(itr, opt), nil + return influxql.NewCallIterator(itr, opt) } // createSystemIterator returns an iterator for a system source.