Remove the non-unreachable panics in the new query engine

The only panics left are ones that should be unreachable unless there is
a bug.

Fixes #5777.
pull/5782/head
Jonathan A. Sternberg 2016-02-22 12:51:45 -05:00
parent b6a0b6a65a
commit 7a03df2af1
6 changed files with 305 additions and 191 deletions

View File

@ -34,7 +34,7 @@ function to allow it to be included during planning.
*/
// NewCallIterator returns a new iterator for a Call.
func NewCallIterator(input Iterator, opt IteratorOptions) Iterator {
func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
name := opt.Expr.(*Call).Name
switch name {
case "count":
@ -52,21 +52,21 @@ func NewCallIterator(input Iterator, opt IteratorOptions) Iterator {
case "mean":
return newMeanIterator(input, opt)
default:
panic(fmt.Sprintf("unsupported function call: %s", name))
return nil, fmt.Errorf("unsupported function call: %s", name)
}
}
// newCountIterator returns an iterator for operating on a count() call.
func newCountIterator(input Iterator, opt IteratorOptions) Iterator {
func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
// FIXME: Wrap iterator in int-type iterator and always output int value.
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatCountReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatCountReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerCountReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerCountReduce}, nil
default:
panic(fmt.Sprintf("unsupported count iterator type: %T", input))
return nil, fmt.Errorf("unsupported count iterator type: %T", input)
}
}
@ -87,14 +87,14 @@ func integerCountReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, in
}
// newMinIterator returns an iterator for operating on a min() call.
func newMinIterator(input Iterator, opt IteratorOptions) Iterator {
func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMinReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMinReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMinReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMinReduce}, nil
default:
panic(fmt.Sprintf("unsupported min iterator type: %T", input))
return nil, fmt.Errorf("unsupported min iterator type: %T", input)
}
}
@ -115,14 +115,14 @@ func integerMinReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6
}
// newMaxIterator returns an iterator for operating on a max() call.
func newMaxIterator(input Iterator, opt IteratorOptions) Iterator {
func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMaxReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMaxReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMaxReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMaxReduce}, nil
default:
panic(fmt.Sprintf("unsupported max iterator type: %T", input))
return nil, fmt.Errorf("unsupported max iterator type: %T", input)
}
}
@ -143,14 +143,14 @@ func integerMaxReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6
}
// newSumIterator returns an iterator for operating on a sum() call.
func newSumIterator(input Iterator, opt IteratorOptions) Iterator {
func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSumReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSumReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSumReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSumReduce}, nil
default:
panic(fmt.Sprintf("unsupported sum iterator type: %T", input))
return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
}
}
@ -171,14 +171,14 @@ func integerSumReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int6
}
// newFirstIterator returns an iterator for operating on a first() call.
func newFirstIterator(input Iterator, opt IteratorOptions) Iterator {
func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatFirstReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatFirstReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerFirstReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerFirstReduce}, nil
default:
panic(fmt.Sprintf("unsupported first iterator type: %T", input))
return nil, fmt.Errorf("unsupported first iterator type: %T", input)
}
}
@ -199,14 +199,14 @@ func integerFirstReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, in
}
// newLastIterator returns an iterator for operating on a last() call.
func newLastIterator(input Iterator, opt IteratorOptions) Iterator {
func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatLastReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatLastReduce}, nil
case IntegerIterator:
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerLastReduce}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerLastReduce}, nil
default:
panic(fmt.Sprintf("unsupported last iterator type: %T", input))
return nil, fmt.Errorf("unsupported last iterator type: %T", input)
}
}
@ -227,16 +227,16 @@ func integerLastReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int
}
// NewDistinctIterator returns an iterator for operating on a distinct() call.
func NewDistinctIterator(input Iterator, opt IteratorOptions) Iterator {
func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatDistinctReduceSlice}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatDistinctReduceSlice}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerDistinctReduceSlice}
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerDistinctReduceSlice}, nil
case StringIterator:
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringDistinctReduceSlice}
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringDistinctReduceSlice}, nil
default:
panic(fmt.Sprintf("unsupported distinct iterator type: %T", input))
return nil, fmt.Errorf("unsupported distinct iterator type: %T", input)
}
}
@ -292,14 +292,14 @@ func stringDistinctReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoin
}
// newMeanIterator returns an iterator for operating on a mean() call.
func newMeanIterator(input Iterator, opt IteratorOptions) Iterator {
func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMeanReduce}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMeanReduce}, nil
case IntegerIterator:
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMeanReduce}
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMeanReduce}, nil
default:
panic(fmt.Sprintf("unsupported mean iterator type: %T", input))
return nil, fmt.Errorf("unsupported mean iterator type: %T", input)
}
}
@ -338,14 +338,14 @@ func integerMeanReduce(prev *FloatPoint, curr *IntegerPoint, opt *reduceOptions)
}
// newMedianIterator returns an iterator for operating on a median() call.
func newMedianIterator(input Iterator, opt IteratorOptions) Iterator {
func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMedianReduceSlice}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMedianReduceSlice}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMedianReduceSlice}
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMedianReduceSlice}, nil
default:
panic(fmt.Sprintf("unsupported median iterator type: %T", input))
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
}
}
@ -386,16 +386,16 @@ func integerMedianReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint
}
// newStddevIterator returns an iterator for operating on a stddev() call.
func newStddevIterator(input Iterator, opt IteratorOptions) Iterator {
func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatStddevReduceSlice}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatStddevReduceSlice}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerStddevReduceSlice}
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerStddevReduceSlice}, nil
case StringIterator:
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringStddevReduceSlice}
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringStddevReduceSlice}, nil
default:
panic(fmt.Sprintf("unsupported stddev iterator type: %T", input))
return nil, fmt.Errorf("unsupported stddev iterator type: %T", input)
}
}
@ -463,14 +463,14 @@ func stringStddevReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoint
}
// newSpreadIterator returns an iterator for operating on a spread() call.
func newSpreadIterator(input Iterator, opt IteratorOptions) Iterator {
func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSpreadReduceSlice}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSpreadReduceSlice}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSpreadReduceSlice}
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSpreadReduceSlice}, nil
default:
panic(fmt.Sprintf("unsupported spread iterator type: %T", input))
return nil, fmt.Errorf("unsupported spread iterator type: %T", input)
}
}
@ -501,14 +501,14 @@ func integerSpreadReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerPoi
}
// newTopIterator returns an iterator for operating on a top() call.
func newTopIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) Iterator {
func newTopIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
default:
panic(fmt.Sprintf("unsupported top iterator type: %T", input))
return nil, fmt.Errorf("unsupported top iterator type: %T", input)
}
}
@ -605,14 +605,14 @@ func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) integerR
}
// newBottomIterator returns an iterator for operating on a bottom() call.
func newBottomIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) Iterator {
func newBottomIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
default:
panic(fmt.Sprintf("unsupported bottom iterator type: %T", input))
return nil, fmt.Errorf("unsupported bottom iterator type: %T", input)
}
}
@ -769,14 +769,14 @@ func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p
}
// newPercentileIterator returns an iterator for operating on a percentile() call.
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) Iterator {
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatPercentileReduceSliceFunc(percentile)}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatPercentileReduceSliceFunc(percentile)}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerPercentileReduceSliceFunc(percentile)}
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerPercentileReduceSliceFunc(percentile)}, nil
default:
panic(fmt.Sprintf("unsupported percentile iterator type: %T", input))
return nil, fmt.Errorf("unsupported percentile iterator type: %T", input)
}
}
@ -811,14 +811,14 @@ func newIntegerPercentileReduceSliceFunc(percentile float64) integerReduceSliceF
}
// newDerivativeIterator returns an iterator for operating on a derivative() call.
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) Iterator {
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)}
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)}
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)}, nil
default:
panic(fmt.Sprintf("unsupported derivative iterator type: %T", input))
return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
}
}

View File

@ -12,7 +12,7 @@ import (
// Ensure that a float iterator can be created for a count() call.
func TestCallIterator_Count_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -44,7 +44,7 @@ func TestCallIterator_Count_Float(t *testing.T) {
// Ensure that an integer iterator can be created for a count() call.
func TestCallIterator_Count_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -76,7 +76,7 @@ func TestCallIterator_Count_Integer(t *testing.T) {
// Ensure that a float iterator can be created for a min() call.
func TestCallIterator_Min_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -107,7 +107,7 @@ func TestCallIterator_Min_Float(t *testing.T) {
// Ensure that a integer iterator can be created for a min() call.
func TestCallIterator_Min_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -138,7 +138,7 @@ func TestCallIterator_Min_Integer(t *testing.T) {
// Ensure that a float iterator can be created for a max() call.
func TestCallIterator_Max_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -168,7 +168,7 @@ func TestCallIterator_Max_Float(t *testing.T) {
// Ensure that a integer iterator can be created for a max() call.
func TestCallIterator_Max_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -198,7 +198,7 @@ func TestCallIterator_Max_Integer(t *testing.T) {
// Ensure that a float iterator can be created for a sum() call.
func TestCallIterator_Sum_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -228,7 +228,7 @@ func TestCallIterator_Sum_Float(t *testing.T) {
// Ensure that an integer iterator can be created for a sum() call.
func TestCallIterator_Sum_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
@ -258,7 +258,7 @@ func TestCallIterator_Sum_Integer(t *testing.T) {
// Ensure that a float iterator can be created for a first() call.
func TestCallIterator_First_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
@ -288,7 +288,7 @@ func TestCallIterator_First_Float(t *testing.T) {
// Ensure that an integer iterator can be created for a first() call.
func TestCallIterator_First_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
@ -318,7 +318,7 @@ func TestCallIterator_First_Integer(t *testing.T) {
// Ensure that a float iterator can be created for a last() call.
func TestCallIterator_Last_Float(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
@ -348,7 +348,7 @@ func TestCallIterator_Last_Float(t *testing.T) {
// Ensure that an integer iterator can be created for a last() call.
func TestCallIterator_Last_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
itr, _ := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
@ -376,18 +376,40 @@ func TestCallIterator_Last_Integer(t *testing.T) {
}
}
func TestNewCallIterator_UnsupportedExprName(t *testing.T) {
_, err := influxql.NewCallIterator(
&FloatIterator{},
influxql.IteratorOptions{
Expr: MustParseExpr(`foobar("value")`),
},
)
if err == nil || err.Error() != "unsupported function call: foobar" {
t.Errorf("unexpected error: %s", err)
}
}
func BenchmarkCallIterator_Min_Float(b *testing.B) {
input := GenerateFloatIterator(rand.New(rand.NewSource(0)), b.N)
b.ResetTimer()
b.ReportAllocs()
itr := influxql.NewCallIterator(input, influxql.IteratorOptions{
itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
}).(influxql.FloatIterator)
for {
if p := itr.Next(); p == nil {
break
})
if err != nil {
b.Fatal(err)
}
switch itr := itr.(type) {
case influxql.FloatIterator:
for {
if p := itr.Next(); p == nil {
break
}
}
default:
b.Fatalf("incorrect iterator type: %T", itr)
}
}

View File

@ -209,104 +209,13 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter
case *Call:
// FIXME(benbjohnson): Validate that only calls with 1 arg are passed to IC.
var err error
var itr Iterator
switch expr.Name {
case "count":
switch arg := expr.Args[0].(type) {
case *Call:
if arg.Name == "distinct" {
input, err := buildExprIterator(arg, ic, opt)
if err != nil {
return nil, err
}
itr = newCountIterator(input, opt)
}
default:
itr, err = ic.CreateIterator(opt)
}
case "min", "max", "sum", "first", "last", "mean":
itr, err = ic.CreateIterator(opt)
case "distinct":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
return NewDistinctIterator(input, opt), nil
case "median":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
itr = newMedianIterator(input, opt)
case "stddev":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
itr = newStddevIterator(input, opt)
case "spread":
// OPTIMIZE(benbjohnson): convert to map/reduce
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
itr = newSpreadIterator(input, opt)
case "top":
var tags []int
if len(expr.Args) < 2 {
return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args))
} else if len(expr.Args) > 2 {
// We need to find the indices of where the tag values are stored in Aux
// This section is O(n^2), but for what should be a low value.
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*VarRef)
for index, name := range opt.Aux {
if name == ref.Val {
tags = append(tags, index)
break
}
}
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*NumberLiteral)
itr = newTopIterator(input, opt, n, tags)
case "bottom":
var tags []int
if len(expr.Args) < 2 {
return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args))
} else if len(expr.Args) > 2 {
// We need to find the indices of where the tag values are stored in Aux
// This section is O(n^2), but for what should be a low value.
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*VarRef)
for index, name := range opt.Aux {
if name == ref.Val {
tags = append(tags, index)
break
}
}
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*NumberLiteral)
itr = newBottomIterator(input, opt, n, tags)
case "percentile":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
percentile := expr.Args[1].(*NumberLiteral).Val
itr = newPercentileIterator(input, opt, percentile)
return NewDistinctIterator(input, opt)
case "derivative", "non_negative_derivative":
input, err := buildExprIterator(expr.Args[0], ic, opt)
if err != nil {
@ -319,19 +228,112 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter
// Derivatives do not use GROUP BY intervals or time constraints, so clear these options.
opt.Interval = Interval{}
opt.StartTime, opt.EndTime = MinTime, MaxTime
return newDerivativeIterator(input, opt, interval, isNonNegative), nil
return newDerivativeIterator(input, opt, interval, isNonNegative)
default:
return nil, fmt.Errorf("unsupported call: %s", expr.Name)
}
itr, err := func() (Iterator, error) {
switch expr.Name {
case "count":
switch arg := expr.Args[0].(type) {
case *Call:
if arg.Name == "distinct" {
input, err := buildExprIterator(arg, ic, opt)
if err != nil {
return nil, err
}
return newCountIterator(input, opt)
}
}
return ic.CreateIterator(opt)
case "min", "max", "sum", "first", "last", "mean":
return ic.CreateIterator(opt)
case "median":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
return newMedianIterator(input, opt)
case "stddev":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
return newStddevIterator(input, opt)
case "spread":
// OPTIMIZE(benbjohnson): convert to map/reduce
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
return newSpreadIterator(input, opt)
case "top":
var tags []int
if len(expr.Args) < 2 {
return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args))
} else if len(expr.Args) > 2 {
// We need to find the indices of where the tag values are stored in Aux
// This section is O(n^2), but for what should be a low value.
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*VarRef)
for index, name := range opt.Aux {
if name == ref.Val {
tags = append(tags, index)
break
}
}
}
}
if err != nil {
return nil, err
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*NumberLiteral)
return newTopIterator(input, opt, n, tags)
case "bottom":
var tags []int
if len(expr.Args) < 2 {
return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args))
} else if len(expr.Args) > 2 {
// We need to find the indices of where the tag values are stored in Aux
// This section is O(n^2), but for what should be a low value.
for i := 1; i < len(expr.Args)-1; i++ {
ref := expr.Args[i].(*VarRef)
for index, name := range opt.Aux {
if name == ref.Val {
tags = append(tags, index)
break
}
}
}
}
if !opt.Interval.IsZero() && opt.Fill != NoFill {
itr = NewFillIterator(itr, expr, opt)
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*NumberLiteral)
return newBottomIterator(input, opt, n, tags)
case "percentile":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt)
if err != nil {
return nil, err
}
percentile := expr.Args[1].(*NumberLiteral).Val
return newPercentileIterator(input, opt, percentile)
default:
return nil, fmt.Errorf("unsupported call: %s", expr.Name)
}
}()
if err != nil {
return nil, err
}
if !opt.Interval.IsZero() && opt.Fill != NoFill {
itr = NewFillIterator(itr, expr, opt)
}
return itr, nil
}
return itr, nil
case *BinaryExpr:
if rhs, ok := expr.RHS.(Literal); ok {
// The right hand side is a literal. It is more common to have the RHS be a literal,

View File

@ -28,7 +28,7 @@ func TestSelect_Min(t *testing.T) {
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -132,6 +132,24 @@ func TestSelect_Distinct_String(t *testing.T) {
}
}
// Ensure a SELECT distinct() query cannot be executed on booleans.
func TestSelect_Distinct_Boolean(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &BooleanIterator{}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
if err == nil || err.Error() != "unsupported distinct iterator type: *influxql_test.BooleanIterator" {
t.Errorf("unexpected error: %s", err)
}
if itrs != nil {
influxql.Iterators(itrs).Close()
}
}
// Ensure a SELECT mean() query can be executed.
func TestSelect_Mean_Float(t *testing.T) {
var ic IteratorCreator
@ -149,7 +167,7 @@ func TestSelect_Mean_Float(t *testing.T) {
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 4},
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4},
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -184,7 +202,7 @@ func TestSelect_Mean_Integer(t *testing.T) {
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 4},
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4},
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -202,6 +220,42 @@ func TestSelect_Mean_Integer(t *testing.T) {
}
}
// Ensure a SELECT mean() query cannot be executed on strings.
func TestSelect_Mean_String(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&StringIterator{}, opt)
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
if err == nil || err.Error() != "unsupported mean iterator type: *influxql_test.StringIterator" {
t.Errorf("unexpected error: %s", err)
}
if itrs != nil {
influxql.Iterators(itrs).Close()
}
}
// Ensure a SELECT mean() query cannot be executed on booleans.
func TestSelect_Mean_Boolean(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&BooleanIterator{}, opt)
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
if err == nil || err.Error() != "unsupported mean iterator type: *influxql_test.BooleanIterator" {
t.Errorf("unexpected error: %s", err)
}
if itrs != nil {
influxql.Iterators(itrs).Close()
}
}
// Ensure a SELECT median() query can be executed.
func TestSelect_Median_Float(t *testing.T) {
var ic IteratorCreator
@ -272,6 +326,42 @@ func TestSelect_Median_Integer(t *testing.T) {
}
}
// Ensure a SELECT median() query cannot be executed on strings.
func TestSelect_Median_String(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &StringIterator{}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
if err == nil || err.Error() != "unsupported median iterator type: *influxql_test.StringIterator" {
t.Errorf("unexpected error: %s", err)
}
if itrs != nil {
influxql.Iterators(itrs).Close()
}
}
// Ensure a SELECT median() query cannot be executed on booleans.
func TestSelect_Median_Boolean(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &BooleanIterator{}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
if err == nil || err.Error() != "unsupported median iterator type: *influxql_test.BooleanIterator" {
t.Errorf("unexpected error: %s", err)
}
if itrs != nil {
influxql.Iterators(itrs).Close()
}
}
// Ensure a SELECT top() query can be executed.
func TestSelect_Top_NoTags_Float(t *testing.T) {
var ic IteratorCreator
@ -774,7 +864,7 @@ func TestSelect_Fill_Null_Float(t *testing.T) {
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -799,7 +889,7 @@ func TestSelect_Fill_Number_Float(t *testing.T) {
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -824,7 +914,7 @@ func TestSelect_Fill_Previous_Float(t *testing.T) {
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
}}, opt), nil
}}, opt)
}
// Execute selection.
@ -1495,7 +1585,7 @@ func TestSelect_ParenExpr(t *testing.T) {
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100},
}}, opt), nil
}}, opt)
}
// Execute selection.

View File

@ -660,7 +660,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
if err != nil {
return nil, err
}
return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt), nil
return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt)
}
itrs, err := e.createVarRefIterator(opt)

View File

@ -463,7 +463,7 @@ func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
}
}
}
return influxql.NewCallIterator(itr, opt), nil
return influxql.NewCallIterator(itr, opt)
}
// createSystemIterator returns an iterator for a system source.