Fixed a bug in first() and last() where the time was lost

last() would always return the last output of the iterator (which isn't
necessarily the last time value due to how the merge iterator works) and
first() would always return the first output of the iterator (wrong for
the same reason).

Now the time is kept by the reduce function and the times are wiped as
part of the reduce iterator after the value has been found.
pull/5196/head
Jonathan A. Sternberg 2016-01-22 22:04:10 -05:00 committed by Ben Johnson
parent 67c1042435
commit 76b49b3ab3
4 changed files with 260 additions and 54 deletions

View File

@ -98,7 +98,7 @@ func newMinIterator(input Iterator, opt IteratorOptions) Iterator {
// floatMinReduce returns the minimum value between prev & curr.
func floatMinReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64, []interface{}) {
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
@ -106,7 +106,7 @@ func floatMinReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64,
// integerMinReduce returns the minimum value between prev & curr.
func integerMinReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int64, []interface{}) {
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
@ -126,7 +126,7 @@ func newMaxIterator(input Iterator, opt IteratorOptions) Iterator {
// floatMaxReduce returns the maximum value between prev & curr.
func floatMaxReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64, []interface{}) {
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
@ -134,7 +134,7 @@ func floatMaxReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64,
// integerMaxReduce returns the maximum value between prev & curr.
func integerMaxReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int64, []interface{}) {
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
@ -154,17 +154,17 @@ func newSumIterator(input Iterator, opt IteratorOptions) Iterator {
// floatSumReduce returns the sum prev value & curr value.
func floatSumReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64, []interface{}) {
if prev == nil {
return opt.startTime, curr.Value, nil
return curr.Time, curr.Value, nil
}
return opt.startTime, prev.Value + curr.Value, nil
return prev.Time, prev.Value + curr.Value, nil
}
// integerSumReduce returns the sum prev value & curr value.
func integerSumReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int64, []interface{}) {
if prev == nil {
return opt.startTime, curr.Value, nil
return curr.Time, curr.Value, nil
}
return opt.startTime, prev.Value + curr.Value, nil
return prev.Time, prev.Value + curr.Value, nil
}
// newFirstIterator returns an iterator for operating on a first() call.
@ -182,17 +182,17 @@ func newFirstIterator(input Iterator, opt IteratorOptions) Iterator {
// floatFirstReduce returns the first point sorted by time.
func floatFirstReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return opt.startTime, prev.Value, prev.Aux
return prev.Time, prev.Value, prev.Aux
}
// integerFirstReduce returns the first point sorted by time.
func integerFirstReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int64, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return opt.startTime, prev.Value, prev.Aux
return prev.Time, prev.Value, prev.Aux
}
// newLastIterator returns an iterator for operating on a last() call.
@ -210,17 +210,17 @@ func newLastIterator(input Iterator, opt IteratorOptions) Iterator {
// floatLastReduce returns the last point sorted by time.
func floatLastReduce(prev, curr *FloatPoint, opt *reduceOptions) (int64, float64, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return opt.startTime, prev.Value, prev.Aux
return prev.Time, prev.Value, prev.Aux
}
// integerLastReduce returns the last point sorted by time.
func integerLastReduce(prev, curr *IntegerPoint, opt *reduceOptions) (int64, int64, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return opt.startTime, curr.Value, curr.Aux
return curr.Time, curr.Value, curr.Aux
}
return opt.startTime, prev.Value, prev.Aux
return prev.Time, prev.Value, prev.Aux
}
// newDistinctIterator returns an iterator for operating on a distinct() call.

View File

@ -7,10 +7,9 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/pkg/deep"
)
// Ensure that an iterator can be created for a count() call.
// Ensure that a float iterator can be created for a count() call.
func TestCallIterator_Count_Float(t *testing.T) {
itr := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
@ -30,12 +29,42 @@ func TestCallIterator_Count_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 3, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 1, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 1, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 1, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 3, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 1, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 1, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 1, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that an integer iterator can be created for a count() call.
func TestCallIterator_Count_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 3, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 1, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 1, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 1, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -47,6 +76,7 @@ func TestCallIterator_Min_Float(t *testing.T) {
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 4, Value: 12, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
@ -60,12 +90,43 @@ func TestCallIterator_Min_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a integer iterator can be created for a min() call.
func TestCallIterator_Min_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 4, Value: 12, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`min("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -90,12 +151,42 @@ func TestCallIterator_Max_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a integer iterator can be created for a max() call.
func TestCallIterator_Max_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`max("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -120,12 +211,42 @@ func TestCallIterator_Sum_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 35, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 35, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that an integer iterator can be created for a sum() call.
func TestCallIterator_Sum_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`sum("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 35, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -150,12 +271,42 @@ func TestCallIterator_First_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that an integer iterator can be created for a first() call.
func TestCallIterator_First_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`first("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -180,12 +331,42 @@ func TestCallIterator_Last_Float(t *testing.T) {
},
)
if a := (Iterators{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
}) {
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that an integer iterator can be created for a last() call.
func TestCallIterator_Last_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`last("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB")},
}); !ok {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

View File

@ -570,6 +570,11 @@ func (itr *floatReduceIterator) reduce() []*FloatPoint {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
@ -1281,6 +1286,11 @@ func (itr *integerReduceIterator) reduce() []*IntegerPoint {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
@ -1992,6 +2002,11 @@ func (itr *stringReduceIterator) reduce() []*StringPoint {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
@ -2703,6 +2718,11 @@ func (itr *booleanReduceIterator) reduce() []*BooleanPoint {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}

View File

@ -572,6 +572,11 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}