Fix reduce iterators to separate by name

Previously reduce iterators just separated points by tags. If you had
identical tags but different names, it would group those together so you
could have these two points:

    cpu value=1
    mem value=2

When you performed a `mean(value)` call and included both cpu and mem as
sources, it would return one mem series with a value of 1.5 instead of
two serieses.
pull/5196/head
Jonathan A. Sternberg 2016-02-03 16:56:15 -05:00 committed by Ben Johnson
parent 2bdc9404ef
commit c602503c7c
3 changed files with 44 additions and 34 deletions

View File

@ -7,20 +7,22 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/pkg/deep"
)
// Ensure that a float iterator can be created for a count() call.
func TestCallIterator_Count_Float(t *testing.T) {
itr := influxql.NewCallIterator(
&FloatIterator{Points: []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: 10, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
@ -29,12 +31,13 @@ func TestCallIterator_Count_Float(t *testing.T) {
},
)
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 3, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 1, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 1, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 1, Tags: ParseTags("host=hostB")},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA")}},
{&influxql.FloatPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB")}},
{&influxql.FloatPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB")}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -43,14 +46,15 @@ func TestCallIterator_Count_Float(t *testing.T) {
func TestCallIterator_Count_Integer(t *testing.T) {
itr := influxql.NewCallIterator(
&IntegerIterator{Points: []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: 10, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
@ -59,12 +63,13 @@ func TestCallIterator_Count_Integer(t *testing.T) {
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 3, Tags: ParseTags("host=hostA")},
{Time: 0, Value: 1, Tags: ParseTags("host=hostB")},
{Time: 5, Value: 1, Tags: ParseTags("host=hostA")},
{Time: 20, Value: 1, Tags: ParseTags("host=hostB")},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA")}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB")}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA")}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB")}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB")}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

View File

@ -501,9 +501,10 @@ func (itr *floatReduceIterator) reduce() []*FloatPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[tags.ID()]
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
@ -512,7 +513,7 @@ func (itr *floatReduceIterator) reduce() []*FloatPoint {
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &FloatPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
m[id] = prev
}
prev.Time = t
prev.Value = v
@ -1178,9 +1179,10 @@ func (itr *integerReduceIterator) reduce() []*IntegerPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[tags.ID()]
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
@ -1189,7 +1191,7 @@ func (itr *integerReduceIterator) reduce() []*IntegerPoint {
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &IntegerPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
m[id] = prev
}
prev.Time = t
prev.Value = v
@ -1855,9 +1857,10 @@ func (itr *stringReduceIterator) reduce() []*StringPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[tags.ID()]
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
@ -1866,7 +1869,7 @@ func (itr *stringReduceIterator) reduce() []*StringPoint {
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &StringPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
m[id] = prev
}
prev.Time = t
prev.Value = v
@ -2532,9 +2535,10 @@ func (itr *booleanReduceIterator) reduce() []*BooleanPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[tags.ID()]
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
@ -2543,7 +2547,7 @@ func (itr *booleanReduceIterator) reduce() []*BooleanPoint {
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &BooleanPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
m[id] = prev
}
prev.Time = t
prev.Value = v

View File

@ -503,9 +503,10 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[tags.ID()]
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
@ -514,7 +515,7 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &{{.Name}}Point{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
m[id] = prev
}
prev.Time = t
prev.Value = v