More work on improving the iterator unit tests

pull/5196/head
Jonathan A. Sternberg 2016-01-22 15:38:59 -05:00 committed by Ben Johnson
parent 3dd6aa17f3
commit 0e1910cb92
5 changed files with 309 additions and 127 deletions

View File

@ -17,17 +17,6 @@ type FloatIterator interface {
Next() *FloatPoint Next() *FloatPoint
} }
// FloatIterators represents a list of float iterators.
type FloatIterators []FloatIterator
// Close closes all iterators.
func (a FloatIterators) Close() error {
for _, itr := range a {
itr.Close()
}
return nil
}
// newFloatIterators converts a slice of Iterator to a slice of FloatIterator. // newFloatIterators converts a slice of Iterator to a slice of FloatIterator.
// Panic if any iterator in itrs is not a FloatIterator. // Panic if any iterator in itrs is not a FloatIterator.
func newFloatIterators(itrs []Iterator) []FloatIterator { func newFloatIterators(itrs []Iterator) []FloatIterator {
@ -139,7 +128,9 @@ func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMe
// Close closes the underlying iterators. // Close closes the underlying iterators.
func (itr *floatMergeIterator) Close() error { func (itr *floatMergeIterator) Close() error {
for _, input := range itr.inputs { for _, input := range itr.inputs {
return input.Close() if err := input.Close(); err != nil {
return err
}
} }
return nil return nil
} }
@ -754,17 +745,6 @@ type IntegerIterator interface {
Next() *IntegerPoint Next() *IntegerPoint
} }
// IntegerIterators represents a list of integer iterators.
type IntegerIterators []IntegerIterator
// Close closes all iterators.
func (a IntegerIterators) Close() error {
for _, itr := range a {
itr.Close()
}
return nil
}
// newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator. // newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator.
// Panic if any iterator in itrs is not a IntegerIterator. // Panic if any iterator in itrs is not a IntegerIterator.
func newIntegerIterators(itrs []Iterator) []IntegerIterator { func newIntegerIterators(itrs []Iterator) []IntegerIterator {
@ -876,7 +856,9 @@ func newIntegerMergeIterator(inputs []IntegerIterator, opt IteratorOptions) *int
// Close closes the underlying iterators. // Close closes the underlying iterators.
func (itr *integerMergeIterator) Close() error { func (itr *integerMergeIterator) Close() error {
for _, input := range itr.inputs { for _, input := range itr.inputs {
return input.Close() if err := input.Close(); err != nil {
return err
}
} }
return nil return nil
} }
@ -1491,17 +1473,6 @@ type StringIterator interface {
Next() *StringPoint Next() *StringPoint
} }
// StringIterators represents a list of string iterators.
type StringIterators []StringIterator
// Close closes all iterators.
func (a StringIterators) Close() error {
for _, itr := range a {
itr.Close()
}
return nil
}
// newStringIterators converts a slice of Iterator to a slice of StringIterator. // newStringIterators converts a slice of Iterator to a slice of StringIterator.
// Panic if any iterator in itrs is not a StringIterator. // Panic if any iterator in itrs is not a StringIterator.
func newStringIterators(itrs []Iterator) []StringIterator { func newStringIterators(itrs []Iterator) []StringIterator {
@ -1613,7 +1584,9 @@ func newStringMergeIterator(inputs []StringIterator, opt IteratorOptions) *strin
// Close closes the underlying iterators. // Close closes the underlying iterators.
func (itr *stringMergeIterator) Close() error { func (itr *stringMergeIterator) Close() error {
for _, input := range itr.inputs { for _, input := range itr.inputs {
return input.Close() if err := input.Close(); err != nil {
return err
}
} }
return nil return nil
} }
@ -2228,17 +2201,6 @@ type BooleanIterator interface {
Next() *BooleanPoint Next() *BooleanPoint
} }
// BooleanIterators represents a list of boolean iterators.
type BooleanIterators []BooleanIterator
// Close closes all iterators.
func (a BooleanIterators) Close() error {
for _, itr := range a {
itr.Close()
}
return nil
}
// newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator. // newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator.
// Panic if any iterator in itrs is not a BooleanIterator. // Panic if any iterator in itrs is not a BooleanIterator.
func newBooleanIterators(itrs []Iterator) []BooleanIterator { func newBooleanIterators(itrs []Iterator) []BooleanIterator {
@ -2350,7 +2312,9 @@ func newBooleanMergeIterator(inputs []BooleanIterator, opt IteratorOptions) *boo
// Close closes the underlying iterators. // Close closes the underlying iterators.
func (itr *booleanMergeIterator) Close() error { func (itr *booleanMergeIterator) Close() error {
for _, input := range itr.inputs { for _, input := range itr.inputs {
return input.Close() if err := input.Close(); err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -16,17 +16,6 @@ type {{.Name}}Iterator interface {
Next() *{{.Name}}Point Next() *{{.Name}}Point
} }
// {{.Name}}Iterators represents a list of {{.name}} iterators.
type {{.Name}}Iterators []{{.Name}}Iterator
// Close closes all iterators.
func (a {{.Name}}Iterators) Close() error {
for _, itr := range a {
itr.Close()
}
return nil
}
// new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator. // new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator.
// Panic if any iterator in itrs is not a {{.Name}}Iterator. // Panic if any iterator in itrs is not a {{.Name}}Iterator.
func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator { func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
@ -137,9 +126,9 @@ func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions)
} }
// Close closes the underlying iterators. // Close closes the underlying iterators.
func (itr *{{.name}}MergeIterator) Close() error { func (itr *{{.name}}MergeIterator) Close() error {
for _, input := range itr.inputs { for _, input := range itr.inputs {
return input.Close() input.Close()
} }
return nil return nil
} }

View File

@ -14,14 +14,15 @@ import (
// Test implementation of influxql.FloatIterator // Test implementation of influxql.FloatIterator
type FloatIterator struct { type FloatIterator struct {
Points []influxql.FloatPoint Points []influxql.FloatPoint
Closed bool
} }
// Close is a no-op. // Close is a no-op.
func (itr *FloatIterator) Close() error { return nil } func (itr *FloatIterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice. // Next returns the next value and shifts it off the beginning of the points slice.
func (itr *FloatIterator) Next() *influxql.FloatPoint { func (itr *FloatIterator) Next() *influxql.FloatPoint {
if len(itr.Points) == 0 { if len(itr.Points) == 0 || itr.Closed {
return nil return nil
} }
@ -31,34 +32,46 @@ func (itr *FloatIterator) Next() *influxql.FloatPoint {
} }
type TestFloatIterator struct { type TestFloatIterator struct {
Iterator influxql.Iterator Inputs []*FloatIterator
Points []influxql.FloatPoint IteratorFn func(itrs []influxql.Iterator) influxql.Iterator
Points []influxql.FloatPoint
} }
func (ti *TestFloatIterator) run(t *testing.T) { func (ti *TestFloatIterator) run(t *testing.T) {
itr := ti.Iterator.(influxql.FloatIterator) itrs := make([]influxql.Iterator, 0, len(ti.Inputs))
for _, itr := range ti.Inputs {
itrs = append(itrs, influxql.Iterator(itr))
}
itr := ti.IteratorFn(itrs).(influxql.FloatIterator)
points := make([]influxql.FloatPoint, 0, len(ti.Points)) points := make([]influxql.FloatPoint, 0, len(ti.Points))
for p := itr.Next(); p != nil; p = itr.Next() { for p := itr.Next(); p != nil; p = itr.Next() {
points = append(points, *p) points = append(points, *p)
} }
itr.Close()
if !deep.Equal(ti.Points, points) { if !deep.Equal(ti.Points, points) {
t.Fatalf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points)) t.Errorf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points))
}
for i, itr := range ti.Inputs {
if !itr.Closed {
t.Errorf("iterator %d not closed", i)
}
} }
} }
// Test implementation of influxql.IntegerIterator // Test implementation of influxql.IntegerIterator
type IntegerIterator struct { type IntegerIterator struct {
Points []influxql.IntegerPoint Points []influxql.IntegerPoint
Closed bool
} }
// Close is a no-op. // Close is a no-op.
func (itr *IntegerIterator) Close() error { return nil } func (itr *IntegerIterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice. // Next returns the next value and shifts it off the beginning of the points slice.
func (itr *IntegerIterator) Next() *influxql.IntegerPoint { func (itr *IntegerIterator) Next() *influxql.IntegerPoint {
if len(itr.Points) == 0 { if len(itr.Points) == 0 || itr.Closed {
return nil return nil
} }
@ -68,34 +81,46 @@ func (itr *IntegerIterator) Next() *influxql.IntegerPoint {
} }
type TestIntegerIterator struct { type TestIntegerIterator struct {
Iterator influxql.Iterator Inputs []*IntegerIterator
Points []influxql.IntegerPoint IteratorFn func(itrs []influxql.Iterator) influxql.Iterator
Points []influxql.IntegerPoint
} }
func (ti *TestIntegerIterator) run(t *testing.T) { func (ti *TestIntegerIterator) run(t *testing.T) {
itr := ti.Iterator.(influxql.IntegerIterator) itrs := make([]influxql.Iterator, 0, len(ti.Inputs))
for _, itr := range ti.Inputs {
itrs = append(itrs, influxql.Iterator(itr))
}
itr := ti.IteratorFn(itrs).(influxql.IntegerIterator)
points := make([]influxql.IntegerPoint, 0, len(ti.Points)) points := make([]influxql.IntegerPoint, 0, len(ti.Points))
for p := itr.Next(); p != nil; p = itr.Next() { for p := itr.Next(); p != nil; p = itr.Next() {
points = append(points, *p) points = append(points, *p)
} }
itr.Close()
if !deep.Equal(ti.Points, points) { if !deep.Equal(ti.Points, points) {
t.Fatalf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points)) t.Errorf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points))
}
for i, itr := range ti.Inputs {
if !itr.Closed {
t.Errorf("iterator %d not closed", i)
}
} }
} }
// Test implementation of influxql.StringIterator // Test implementation of influxql.StringIterator
type StringIterator struct { type StringIterator struct {
Points []influxql.StringPoint Points []influxql.StringPoint
Closed bool
} }
// Close is a no-op. // Close is a no-op.
func (itr *StringIterator) Close() error { return nil } func (itr *StringIterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice. // Next returns the next value and shifts it off the beginning of the points slice.
func (itr *StringIterator) Next() *influxql.StringPoint { func (itr *StringIterator) Next() *influxql.StringPoint {
if len(itr.Points) == 0 { if len(itr.Points) == 0 || itr.Closed {
return nil return nil
} }
@ -105,34 +130,46 @@ func (itr *StringIterator) Next() *influxql.StringPoint {
} }
type TestStringIterator struct { type TestStringIterator struct {
Iterator influxql.Iterator Inputs []*StringIterator
Points []influxql.StringPoint IteratorFn func(itrs []influxql.Iterator) influxql.Iterator
Points []influxql.StringPoint
} }
func (ti *TestStringIterator) run(t *testing.T) { func (ti *TestStringIterator) run(t *testing.T) {
itr := ti.Iterator.(influxql.StringIterator) itrs := make([]influxql.Iterator, 0, len(ti.Inputs))
for _, itr := range ti.Inputs {
itrs = append(itrs, influxql.Iterator(itr))
}
itr := ti.IteratorFn(itrs).(influxql.StringIterator)
points := make([]influxql.StringPoint, 0, len(ti.Points)) points := make([]influxql.StringPoint, 0, len(ti.Points))
for p := itr.Next(); p != nil; p = itr.Next() { for p := itr.Next(); p != nil; p = itr.Next() {
points = append(points, *p) points = append(points, *p)
} }
itr.Close()
if !deep.Equal(ti.Points, points) { if !deep.Equal(ti.Points, points) {
t.Fatalf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points)) t.Errorf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points))
}
for i, itr := range ti.Inputs {
if !itr.Closed {
t.Errorf("iterator %d not closed", i)
}
} }
} }
// Test implementation of influxql.BooleanIterator // Test implementation of influxql.BooleanIterator
type BooleanIterator struct { type BooleanIterator struct {
Points []influxql.BooleanPoint Points []influxql.BooleanPoint
Closed bool
} }
// Close is a no-op. // Close is a no-op.
func (itr *BooleanIterator) Close() error { return nil } func (itr *BooleanIterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice. // Next returns the next value and shifts it off the beginning of the points slice.
func (itr *BooleanIterator) Next() *influxql.BooleanPoint { func (itr *BooleanIterator) Next() *influxql.BooleanPoint {
if len(itr.Points) == 0 { if len(itr.Points) == 0 || itr.Closed {
return nil return nil
} }
@ -142,19 +179,30 @@ func (itr *BooleanIterator) Next() *influxql.BooleanPoint {
} }
type TestBooleanIterator struct { type TestBooleanIterator struct {
Iterator influxql.Iterator Inputs []*BooleanIterator
Points []influxql.BooleanPoint IteratorFn func(itrs []influxql.Iterator) influxql.Iterator
Points []influxql.BooleanPoint
} }
func (ti *TestBooleanIterator) run(t *testing.T) { func (ti *TestBooleanIterator) run(t *testing.T) {
itr := ti.Iterator.(influxql.BooleanIterator) itrs := make([]influxql.Iterator, 0, len(ti.Inputs))
for _, itr := range ti.Inputs {
itrs = append(itrs, influxql.Iterator(itr))
}
itr := ti.IteratorFn(itrs).(influxql.BooleanIterator)
points := make([]influxql.BooleanPoint, 0, len(ti.Points)) points := make([]influxql.BooleanPoint, 0, len(ti.Points))
for p := itr.Next(); p != nil; p = itr.Next() { for p := itr.Next(); p != nil; p = itr.Next() {
points = append(points, *p) points = append(points, *p)
} }
itr.Close()
if !deep.Equal(ti.Points, points) { if !deep.Equal(ti.Points, points) {
t.Fatalf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points)) t.Errorf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points))
}
for i, itr := range ti.Inputs {
if !itr.Closed {
t.Errorf("iterator %d not closed", i)
}
} }
} }

View File

@ -13,14 +13,15 @@ import (
// Test implementation of influxql.{{.Name}}Iterator // Test implementation of influxql.{{.Name}}Iterator
type {{.Name}}Iterator struct { type {{.Name}}Iterator struct {
Points []influxql.{{.Name}}Point Points []influxql.{{.Name}}Point
Closed bool
} }
// Close is a no-op. // Close is a no-op.
func (itr *{{.Name}}Iterator) Close() error { return nil } func (itr *{{.Name}}Iterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice. // Next returns the next value and shifts it off the beginning of the points slice.
func (itr *{{.Name}}Iterator) Next() *influxql.{{.Name}}Point { func (itr *{{.Name}}Iterator) Next() *influxql.{{.Name}}Point {
if len(itr.Points) == 0 { if len(itr.Points) == 0 || itr.Closed {
return nil return nil
} }
@ -30,20 +31,31 @@ func (itr *{{.Name}}Iterator) Next() *influxql.{{.Name}}Point {
} }
type Test{{.Name}}Iterator struct { type Test{{.Name}}Iterator struct {
Iterator influxql.Iterator Inputs []*{{.Name}}Iterator
Points []influxql.{{.Name}}Point IteratorFn func(itrs []influxql.Iterator) influxql.Iterator
Points []influxql.{{.Name}}Point
} }
func (ti *Test{{.Name}}Iterator) run(t *testing.T) { func (ti *Test{{.Name}}Iterator) run(t *testing.T) {
itr := ti.Iterator.(influxql.{{.Name}}Iterator) itrs := make([]influxql.Iterator, 0, len(ti.Inputs))
for _, itr := range ti.Inputs {
itrs = append(itrs, influxql.Iterator(itr))
}
itr := ti.IteratorFn(itrs).(influxql.{{.Name}}Iterator)
points := make([]influxql.{{.Name}}Point, 0, len(ti.Points)) points := make([]influxql.{{.Name}}Point, 0, len(ti.Points))
for p := itr.Next(); p != nil; p = itr.Next() { for p := itr.Next(); p != nil; p = itr.Next() {
points = append(points, *p) points = append(points, *p)
} }
itr.Close()
if !deep.Equal(ti.Points, points) { if !deep.Equal(ti.Points, points) {
t.Fatalf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points)) t.Errorf("unexpected points: %s %s", spew.Sdump(points), spew.Sdump(ti.Points))
}
for i, itr := range ti.Inputs {
if !itr.Closed {
t.Errorf("iterator %d not closed", i)
}
} }
} }
{{end}} {{end}}

View File

@ -18,24 +18,28 @@ import (
// Ensure that a set of iterators can be merged together, sorted by window and name/tag. // Ensure that a set of iterators can be merged together, sorted by window and name/tag.
func TestMergeIterator_Float(t *testing.T) { func TestMergeIterator_Float(t *testing.T) {
test := TestFloatIterator{ test := TestFloatIterator{
Iterator: influxql.NewMergeIterator([]influxql.Iterator{ Inputs: []*FloatIterator{
&FloatIterator{Points: []influxql.FloatPoint{ {Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
}}, }},
&FloatIterator{Points: []influxql.FloatPoint{ {Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
}}, }},
}, influxql.IteratorOptions{ {Points: []influxql.FloatPoint{}},
Interval: influxql.Interval{ },
Duration: 10 * time.Nanosecond, IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
}, return influxql.NewMergeIterator(itrs, influxql.IteratorOptions{
Ascending: true, Interval: influxql.Interval{
}), Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.FloatPoint{ Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
@ -52,24 +56,28 @@ func TestMergeIterator_Float(t *testing.T) {
// Ensure that a set of iterators can be merged together, sorted by window and name/tag. // Ensure that a set of iterators can be merged together, sorted by window and name/tag.
func TestMergeIterator_Integer(t *testing.T) { func TestMergeIterator_Integer(t *testing.T) {
test := TestIntegerIterator{ test := TestIntegerIterator{
Iterator: influxql.NewMergeIterator([]influxql.Iterator{ Inputs: []*IntegerIterator{
&IntegerIterator{Points: []influxql.IntegerPoint{ {Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
}}, }},
&IntegerIterator{Points: []influxql.IntegerPoint{ {Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
}}, }},
}, influxql.IteratorOptions{ {Points: []influxql.IntegerPoint{}},
Interval: influxql.Interval{ },
Duration: 10 * time.Nanosecond, IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
}, return influxql.NewMergeIterator(itrs, influxql.IteratorOptions{
Ascending: true, Interval: influxql.Interval{
}), Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.IntegerPoint{ Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
@ -86,24 +94,28 @@ func TestMergeIterator_Integer(t *testing.T) {
// Ensure that a set of iterators can be merged together, sorted by window and name/tag. // Ensure that a set of iterators can be merged together, sorted by window and name/tag.
func TestMergeIterator_String(t *testing.T) { func TestMergeIterator_String(t *testing.T) {
test := TestStringIterator{ test := TestStringIterator{
Iterator: influxql.NewMergeIterator([]influxql.Iterator{ Inputs: []*StringIterator{
&StringIterator{Points: []influxql.StringPoint{ {Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"},
}}, }},
&StringIterator{Points: []influxql.StringPoint{ {Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
}}, }},
}, influxql.IteratorOptions{ {Points: []influxql.StringPoint{}},
Interval: influxql.Interval{ },
Duration: 10 * time.Nanosecond, IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
}, return influxql.NewMergeIterator(itrs, influxql.IteratorOptions{
Ascending: true, Interval: influxql.Interval{
}), Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.StringPoint{ Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
@ -120,24 +132,28 @@ func TestMergeIterator_String(t *testing.T) {
// Ensure that a set of iterators can be merged together, sorted by window and name/tag. // Ensure that a set of iterators can be merged together, sorted by window and name/tag.
func TestMergeIterator_Boolean(t *testing.T) { func TestMergeIterator_Boolean(t *testing.T) {
test := TestBooleanIterator{ test := TestBooleanIterator{
Iterator: influxql.NewMergeIterator([]influxql.Iterator{ Inputs: []*BooleanIterator{
&BooleanIterator{Points: []influxql.BooleanPoint{ {Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false},
}}, }},
&BooleanIterator{Points: []influxql.BooleanPoint{ {Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
}}, }},
}, influxql.IteratorOptions{ {Points: []influxql.BooleanPoint{}},
Interval: influxql.Interval{ },
Duration: 10 * time.Nanosecond, IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
}, return influxql.NewMergeIterator(itrs, influxql.IteratorOptions{
Ascending: true, Interval: influxql.Interval{
}), Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.BooleanPoint{ Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}, {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}, {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
@ -158,6 +174,159 @@ func TestMergeIterator_Nil(t *testing.T) {
} }
} }
// Ensure that a set of iterators can be merged together, sorted by name/tag.
func TestSortedMergeIterator_Float(t *testing.T) {
test := TestFloatIterator{
Inputs: []*FloatIterator{
{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
}},
{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
}},
{Points: []influxql.FloatPoint{}},
},
IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
return influxql.NewSortedMergeIterator(itrs,
influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
},
}
test.run(t)
}
// Ensure that a set of iterators can be merged together, sorted by name/tag.
func TestSortedMergeIterator_Integer(t *testing.T) {
test := TestIntegerIterator{
Inputs: []*IntegerIterator{
{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
}},
{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
}},
{Points: []influxql.IntegerPoint{}},
},
IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
return influxql.NewSortedMergeIterator(itrs, influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
},
}
test.run(t)
}
// Ensure that a set of iterators can be merged together, sorted by name/tag.
func TestSortedMergeIterator_String(t *testing.T) {
test := TestStringIterator{
Inputs: []*StringIterator{
{Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
}},
{Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"},
}},
{Points: []influxql.StringPoint{}},
},
IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
return influxql.NewSortedMergeIterator(itrs, influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"},
},
}
test.run(t)
}
// Ensure that a set of iterators can be merged together, sorted by name/tag.
func TestSortedMergeIterator_Boolean(t *testing.T) {
test := TestBooleanIterator{
Inputs: []*BooleanIterator{
{Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
}},
{Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false},
}},
{Points: []influxql.BooleanPoint{}},
},
IteratorFn: func(itrs []influxql.Iterator) influxql.Iterator {
return influxql.NewSortedMergeIterator(itrs, influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
},
Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false},
},
}
test.run(t)
}
func TestSortedMergeIterator(t *testing.T) { func TestSortedMergeIterator(t *testing.T) {
itr := influxql.NewSortedMergeIterator([]influxql.Iterator{ itr := influxql.NewSortedMergeIterator([]influxql.Iterator{
&FloatIterator{Points: []influxql.FloatPoint{ &FloatIterator{Points: []influxql.FloatPoint{