Support derivative() call for integer fields in the new query engine

Fixes #5640.
pull/5652/head
Jonathan A. Sternberg 2016-02-12 11:23:52 -05:00
parent b273980bc1
commit 42b9166000
2 changed files with 92 additions and 0 deletions

View File

@ -804,6 +804,8 @@ func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interva
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)}
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)}
default:
panic(fmt.Sprintf("unsupported derivative iterator type: %T", input))
}
@ -851,6 +853,48 @@ func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) fl
}
}
// newIntegerDerivativeReduceSliceFunc returns the derivative value within a window.
func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) integerReduceSliceFloatFunc {
prev := IntegerPoint{Time: -1}
return func(a []IntegerPoint, opt *reduceOptions) []FloatPoint {
if len(a) == 0 {
return []FloatPoint{}
} else if len(a) == 1 {
return []FloatPoint{{Time: a[0].Time, Nil: true}}
}
if prev.Time == -1 {
prev = a[0]
}
output := make([]FloatPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]
// Calculate the derivative of successive points by dividing the
// difference of each value by the elapsed time normalized to the interval.
diff := float64(p.Value - prev.Value)
elapsed := p.Time - prev.Time
value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(interval.Duration))
}
prev = *p
// Drop negative values for non-negative derivatives.
if isNonNegative && diff < 0 {
continue
}
output = append(output, FloatPoint{Time: p.Time, Value: value})
}
return output
}
}
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {

View File

@ -1474,3 +1474,51 @@ func TestSelect_ParenExpr(t *testing.T) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Derivative_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic)
if err != nil {
t.Fatal(err)
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 2.25}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: -4}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Derivative_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic)
if err != nil {
t.Fatal(err)
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 2.25}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: -4}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}