Support all iterator types for count(), first(), and last()

All three of these iterators are supposed to support all four types of
iterators, but the implementation was never done for string or boolean.

Fixes #5886.
pull/5895/head
Jonathan A. Sternberg 2016-03-02 18:42:00 -05:00
parent 2440568b27
commit e3660fae93
6 changed files with 1959 additions and 392 deletions

View File

@ -62,24 +62,36 @@ func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatFuncReducer(floatCountReduce)
createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
fn := NewFloatFuncIntegerReducer(floatCountReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceIntegerIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerCountReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, IntegerPointEmitter) {
fn := NewStringFuncIntegerReducer(stringCountReduce)
return fn, fn
}
return &stringReduceIntegerIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
fn := NewBooleanFuncIntegerReducer(booleanCountReduce)
return fn, fn
}
return &booleanReduceIntegerIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported count iterator type: %T", input)
}
}
// floatCountReduce returns the count of points.
func floatCountReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
func floatCountReduce(prev *IntegerPoint, curr *FloatPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
@ -94,6 +106,22 @@ func integerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
return ZeroTime, prev.Value + 1, nil
}
// stringCountReduce returns the count of points.
func stringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
return ZeroTime, prev.Value + 1, nil
}
// booleanCountReduce returns the count of points.
func booleanCountReduce(prev *IntegerPoint, curr *BooleanPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
return ZeroTime, prev.Value + 1, nil
}
// newMinIterator returns an iterator for operating on a min() call.
func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -102,13 +130,13 @@ func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatMinReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerMinReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported min iterator type: %T", input)
}
@ -138,13 +166,13 @@ func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatMaxReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerMaxReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported max iterator type: %T", input)
}
@ -174,13 +202,13 @@ func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatSumReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerSumReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
}
@ -210,13 +238,25 @@ func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatFirstReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerFirstReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringFuncReducer(stringFirstReduce)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanFuncReducer(booleanFirstReduce)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported first iterator type: %T", input)
}
@ -238,6 +278,22 @@ func integerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
return prev.Time, prev.Value, prev.Aux
}
// stringFirstReduce returns the first point sorted by time.
func stringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// booleanFirstReduce returns the first point sorted by time.
func booleanFirstReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && !curr.Value && prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// newLastIterator returns an iterator for operating on a last() call.
func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -246,13 +302,25 @@ func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatLastReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerLastReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringFuncReducer(stringLastReduce)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanFuncReducer(booleanLastReduce)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported last iterator type: %T", input)
}
@ -274,6 +342,22 @@ func integerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
return prev.Time, prev.Value, prev.Aux
}
// stringLastReduce returns the first point sorted by time.
func stringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// booleanLastReduce returns the first point sorted by time.
func booleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value && !prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// NewDistinctIterator returns an iterator for operating on a distinct() call.
func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -347,7 +431,7 @@ func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatMeanReducer()
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerMeanReducer()
@ -928,94 +1012,6 @@ func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool)
}
}
type integerReduceFloatIterator struct {
input *bufIntegerIterator
create func() (IntegerPointAggregator, FloatPointEmitter)
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceFloatIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceFloatIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// floatReduceIntegerPoint stores the reduced data for a name/tag combination.
type integerReduceFloatPoint struct {
Name string
Tags Tags
Aggregator IntegerPointAggregator
Emitter FloatPointEmitter
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceFloatIterator) reduce() []*FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]*integerReduceFloatPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &integerReduceFloatPoint{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
m[id] = rp
}
rp.Aggregator.Aggregate(curr)
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
rp := m[k]
p := rp.Emitter.Emit()
p.Name = rp.Name
p.Tags = rp.Tags
p.Time = startTime
a[i] = p
}
return a
}
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {

View File

@ -32,11 +32,11 @@ func TestCallIterator_Count_Float(t *testing.T) {
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
@ -74,6 +74,70 @@ func TestCallIterator_Count_Integer(t *testing.T) {
}
}
// Ensure that a string iterator can be created for a count() call.
func TestCallIterator_Count_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Name: "cpu", Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 5, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: "b", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a count() call.
func TestCallIterator_Count_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Name: "cpu", Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 5, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a float iterator can be created for a min() call.
func TestCallIterator_Min_Float(t *testing.T) {
itr, _ := influxql.NewCallIterator(
@ -276,12 +340,12 @@ func TestCallIterator_First_Float(t *testing.T) {
},
)
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -306,12 +370,72 @@ func TestCallIterator_First_Integer(t *testing.T) {
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a string iterator can be created for a first() call.
func TestCallIterator_First_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`first("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.StringPoint{Time: 0, Value: "d", Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.StringPoint{Time: 0, Value: "c", Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.StringPoint{Time: 5, Value: "e", Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.StringPoint{Time: 20, Value: "a", Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a first() call.
func TestCallIterator_First_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`first("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 20, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -336,12 +460,12 @@ func TestCallIterator_Last_Float(t *testing.T) {
},
)
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -366,12 +490,72 @@ func TestCallIterator_Last_Integer(t *testing.T) {
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a string iterator can be created for a last() call.
func TestCallIterator_Last_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`last("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.StringPoint{Time: 0, Value: "b", Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.StringPoint{Time: 0, Value: "c", Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.StringPoint{Time: 5, Value: "e", Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.StringPoint{Time: 20, Value: "a", Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a last() call.
func TestCallIterator_Last_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`last("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.BooleanPoint{Time: 0, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 20, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

View File

@ -17,7 +17,7 @@ type FloatPointEmitter interface {
}
// FloatReduceFunc is the function called by a FloatPoint reducer.
type FloatReduceFunc func(prev, curr *FloatPoint) (t int64, v float64, aux []interface{})
type FloatReduceFunc func(prev *FloatPoint, curr *FloatPoint) (t int64, v float64, aux []interface{})
type FloatFuncReducer struct {
prev *FloatPoint
@ -43,6 +43,87 @@ func (r *FloatFuncReducer) Emit() *FloatPoint {
return r.prev
}
// FloatReduceIntegerFunc is the function called by a FloatPoint reducer.
type FloatReduceIntegerFunc func(prev *IntegerPoint, curr *FloatPoint) (t int64, v int64, aux []interface{})
type FloatFuncIntegerReducer struct {
prev *IntegerPoint
fn FloatReduceIntegerFunc
}
func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc) *FloatFuncIntegerReducer {
return &FloatFuncIntegerReducer{fn: fn}
}
func (r *FloatFuncIntegerReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// FloatReduceStringFunc is the function called by a FloatPoint reducer.
type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{})
type FloatFuncStringReducer struct {
prev *StringPoint
fn FloatReduceStringFunc
}
func NewFloatFuncStringReducer(fn FloatReduceStringFunc) *FloatFuncStringReducer {
return &FloatFuncStringReducer{fn: fn}
}
func (r *FloatFuncStringReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// FloatReduceBooleanFunc is the function called by a FloatPoint reducer.
type FloatReduceBooleanFunc func(prev *BooleanPoint, curr *FloatPoint) (t int64, v bool, aux []interface{})
type FloatFuncBooleanReducer struct {
prev *BooleanPoint
fn FloatReduceBooleanFunc
}
func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc) *FloatFuncBooleanReducer {
return &FloatFuncBooleanReducer{fn: fn}
}
func (r *FloatFuncBooleanReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
Aggregate(p *IntegerPoint)
@ -53,8 +134,35 @@ type IntegerPointEmitter interface {
Emit() *IntegerPoint
}
// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFloatFunc func(prev *FloatPoint, curr *IntegerPoint) (t int64, v float64, aux []interface{})
type IntegerFuncFloatReducer struct {
prev *FloatPoint
fn IntegerReduceFloatFunc
}
func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc) *IntegerFuncFloatReducer {
return &IntegerFuncFloatReducer{fn: fn}
}
func (r *IntegerFuncFloatReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// IntegerReduceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFunc func(prev, curr *IntegerPoint) (t int64, v int64, aux []interface{})
type IntegerReduceFunc func(prev *IntegerPoint, curr *IntegerPoint) (t int64, v int64, aux []interface{})
type IntegerFuncReducer struct {
prev *IntegerPoint
@ -80,6 +188,60 @@ func (r *IntegerFuncReducer) Emit() *IntegerPoint {
return r.prev
}
// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{})
type IntegerFuncStringReducer struct {
prev *StringPoint
fn IntegerReduceStringFunc
}
func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc) *IntegerFuncStringReducer {
return &IntegerFuncStringReducer{fn: fn}
}
func (r *IntegerFuncStringReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer.
type IntegerReduceBooleanFunc func(prev *BooleanPoint, curr *IntegerPoint) (t int64, v bool, aux []interface{})
type IntegerFuncBooleanReducer struct {
prev *BooleanPoint
fn IntegerReduceBooleanFunc
}
func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc) *IntegerFuncBooleanReducer {
return &IntegerFuncBooleanReducer{fn: fn}
}
func (r *IntegerFuncBooleanReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
Aggregate(p *StringPoint)
@ -90,8 +252,62 @@ type StringPointEmitter interface {
Emit() *StringPoint
}
// StringReduceFloatFunc is the function called by a StringPoint reducer.
type StringReduceFloatFunc func(prev *FloatPoint, curr *StringPoint) (t int64, v float64, aux []interface{})
type StringFuncFloatReducer struct {
prev *FloatPoint
fn StringReduceFloatFunc
}
func NewStringFuncFloatReducer(fn StringReduceFloatFunc) *StringFuncFloatReducer {
return &StringFuncFloatReducer{fn: fn}
}
func (r *StringFuncFloatReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// StringReduceIntegerFunc is the function called by a StringPoint reducer.
type StringReduceIntegerFunc func(prev *IntegerPoint, curr *StringPoint) (t int64, v int64, aux []interface{})
type StringFuncIntegerReducer struct {
prev *IntegerPoint
fn StringReduceIntegerFunc
}
func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc) *StringFuncIntegerReducer {
return &StringFuncIntegerReducer{fn: fn}
}
func (r *StringFuncIntegerReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// StringReduceFunc is the function called by a StringPoint reducer.
type StringReduceFunc func(prev, curr *StringPoint) (t int64, v string, aux []interface{})
type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{})
type StringFuncReducer struct {
prev *StringPoint
@ -117,6 +333,33 @@ func (r *StringFuncReducer) Emit() *StringPoint {
return r.prev
}
// StringReduceBooleanFunc is the function called by a StringPoint reducer.
type StringReduceBooleanFunc func(prev *BooleanPoint, curr *StringPoint) (t int64, v bool, aux []interface{})
type StringFuncBooleanReducer struct {
prev *BooleanPoint
fn StringReduceBooleanFunc
}
func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc) *StringFuncBooleanReducer {
return &StringFuncBooleanReducer{fn: fn}
}
func (r *StringFuncBooleanReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
Aggregate(p *BooleanPoint)
@ -127,8 +370,89 @@ type BooleanPointEmitter interface {
Emit() *BooleanPoint
}
// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFloatFunc func(prev *FloatPoint, curr *BooleanPoint) (t int64, v float64, aux []interface{})
type BooleanFuncFloatReducer struct {
prev *FloatPoint
fn BooleanReduceFloatFunc
}
func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc) *BooleanFuncFloatReducer {
return &BooleanFuncFloatReducer{fn: fn}
}
func (r *BooleanFuncFloatReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer.
type BooleanReduceIntegerFunc func(prev *IntegerPoint, curr *BooleanPoint) (t int64, v int64, aux []interface{})
type BooleanFuncIntegerReducer struct {
prev *IntegerPoint
fn BooleanReduceIntegerFunc
}
func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc) *BooleanFuncIntegerReducer {
return &BooleanFuncIntegerReducer{fn: fn}
}
func (r *BooleanFuncIntegerReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{})
type BooleanFuncStringReducer struct {
prev *StringPoint
fn BooleanReduceStringFunc
}
func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc) *BooleanFuncStringReducer {
return &BooleanFuncStringReducer{fn: fn}
}
func (r *BooleanFuncStringReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// BooleanReduceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFunc func(prev, curr *BooleanPoint) (t int64, v bool, aux []interface{})
type BooleanReduceFunc func(prev *BooleanPoint, curr *BooleanPoint) (t int64, v bool, aux []interface{})
type BooleanFuncReducer struct {
prev *BooleanPoint

View File

@ -1,33 +1,35 @@
package influxql
{{range .}}
{{with $types := .}}{{range $k := $types}}
// {{.Name}}PointAggregator aggregates points to produce a single point.
type {{.Name}}PointAggregator interface {
// {{$k.Name}}PointAggregator aggregates points to produce a single point.
type {{$k.Name}}PointAggregator interface {
Aggregate(p *{{.Name}}Point)
}
// {{.Name}}PointEmitter produces a single point from an aggregate.
type {{.Name}}PointEmitter interface {
Emit() *{{.Name}}Point
// {{$k.Name}}PointEmitter produces a single point from an aggregate.
type {{$k.Name}}PointEmitter interface {
Emit() *{{$k.Name}}Point
}
// {{.Name}}ReduceFunc is the function called by a {{.Name}}Point reducer.
type {{.Name}}ReduceFunc func(prev, curr *{{.Name}}Point) (t int64, v {{.Type}}, aux []interface{})
{{range $v := $types}}
type {{.Name}}FuncReducer struct {
prev *{{.Name}}Point
fn {{.Name}}ReduceFunc
// {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func is the function called by a {{$k.Name}}Point reducer.
type {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func func(prev *{{$v.Name}}Point, curr *{{$k.Name}}Point) (t int64, v {{$v.Type}}, aux []interface{})
type {{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer struct {
prev *{{$v.Name}}Point
fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func
}
func New{{.Name}}FuncReducer(fn {{.Name}}ReduceFunc) *{{.Name}}FuncReducer {
return &{{.Name}}FuncReducer{fn: fn}
func New{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func) *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer {
return &{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn}
}
func (r *{{.Name}}FuncReducer) Aggregate(p *{{.Name}}Point) {
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate(p *{{$k.Name}}Point) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &{{.Name}}Point{}
r.prev = &{{$v.Name}}Point{}
}
r.prev.Time = t
r.prev.Value = v
@ -35,7 +37,7 @@ func (r *{{.Name}}FuncReducer) Aggregate(p *{{.Name}}Point) {
r.prev.Aggregated++
}
func (r *{{.Name}}FuncReducer) Emit() *{{.Name}}Point {
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() *{{$v.Name}}Point {
return r.prev
}
{{end}}
{{end}}{{end}}{{end}}

File diff suppressed because it is too large Load Diff

View File

@ -12,22 +12,22 @@ import (
"github.com/gogo/protobuf/proto"
)
{{range .}}
{{with $types := .}}{{range $k := $types}}
// {{.Name}}Iterator represents a stream of {{.name}} points.
type {{.Name}}Iterator interface {
// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
type {{$k.Name}}Iterator interface {
Iterator
Next() *{{.Name}}Point
Next() *{{$k.Name}}Point
}
// new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{.Name}}Iterator and cannot
// be cast to a {{.Name}}Iterator.
func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
a := make([]{{.Name}}Iterator, 0, len(itrs))
// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
// be cast to a {{$k.Name}}Iterator.
func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
a := make([]{{$k.Name}}Iterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case {{.Name}}Iterator:
case {{$k.Name}}Iterator:
a = append(a, itr)
{{if eq .Name "Float"}}
case IntegerIterator:
@ -41,24 +41,22 @@ func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
}
// buf{{.Name}}Iterator represents a buffered {{.Name}}Iterator.
type buf{{.Name}}Iterator struct {
itr {{.Name}}Iterator
buf *{{.Name}}Point
// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
type buf{{$k.Name}}Iterator struct {
itr {{$k.Name}}Iterator
buf *{{$k.Name}}Point
}
// newBuf{{.Name}}Iterator returns a buffered {{.Name}}Iterator.
func newBuf{{.Name}}Iterator(itr {{.Name}}Iterator) *buf{{.Name}}Iterator {
return &buf{{.Name}}Iterator{
itr: itr,
}
// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
return &buf{{$k.Name}}Iterator{itr: itr}
}
// Close closes the underlying iterator.
func (itr *buf{{.Name}}Iterator) Close() error { return itr.itr.Close() }
func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *buf{{.Name}}Iterator) peek() *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) peek() *{{$k.Name}}Point {
p := itr.Next()
itr.unread(p)
return p
@ -66,7 +64,7 @@ func (itr *buf{{.Name}}Iterator) peek() *{{.Name}}Point {
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *buf{{.Name}}Iterator) peekTime() int64 {
func (itr *buf{{$k.Name}}Iterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
@ -75,7 +73,7 @@ func (itr *buf{{.Name}}Iterator) peekTime() int64 {
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *buf{{.Name}}Iterator) Next() *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) Next() *{{$k.Name}}Point {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
@ -86,7 +84,7 @@ func (itr *buf{{.Name}}Iterator) Next() *{{.Name}}Point {
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *buf{{.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{$k.Name}}Point {
v := itr.Next()
if v == nil {
return nil
@ -98,15 +96,15 @@ func (itr *buf{{.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{.Name
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *buf{{.Name}}Iterator) unread(v *{{.Name}}Point) { itr.buf = v }
func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
// {{.name}}MergeIterator represents an iterator that combines multiple {{.name}} iterators.
type {{.name}}MergeIterator struct {
inputs []{{.Name}}Iterator
heap *{{.name}}MergeHeap
// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
type {{$k.name}}MergeIterator struct {
inputs []{{$k.Name}}Iterator
heap *{{$k.name}}MergeHeap
// Current iterator and window.
curr *{{.name}}MergeHeapItem
curr *{{$k.name}}MergeHeapItem
window struct {
name string
tags string
@ -115,12 +113,12 @@ type {{.name}}MergeIterator struct {
}
}
// new{{.Name}}MergeIterator returns a new instance of {{.name}}MergeIterator.
func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions) *{{.name}}MergeIterator {
itr := &{{.name}}MergeIterator{
// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
itr := &{{$k.name}}MergeIterator{
inputs: inputs,
heap: &{{.name}}MergeHeap{
items: make([]*{{.name}}MergeHeapItem, 0, len(inputs)),
heap: &{{$k.name}}MergeHeap{
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
@ -128,13 +126,13 @@ func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions)
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBuf{{.Name}}Iterator(input)
bufInput := newBuf{{$k.Name}}Iterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &{{.name}}MergeHeapItem{itr: bufInput})
itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
@ -142,7 +140,7 @@ func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions)
}
// Close closes the underlying iterators.
func (itr *{{.name}}MergeIterator) Close() error {
func (itr *{{$k.name}}MergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
@ -150,14 +148,14 @@ func (itr *{{.name}}MergeIterator) Close() error {
}
// Next returns the next point from the iterator.
func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}MergeIterator) Next() *{{$k.Name}}Point {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*{{.name}}MergeHeapItem)
itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
@ -199,16 +197,16 @@ func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
}
}
// {{.name}}MergeHeap represents a heap of {{.name}}MergeHeapItems.
// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type {{.name}}MergeHeap struct {
type {{$k.name}}MergeHeap struct {
opt IteratorOptions
items []*{{.name}}MergeHeapItem
items []*{{$k.name}}MergeHeapItem
}
func (h {{.name}}MergeHeap) Len() int { return len(h.items) }
func (h {{.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h {{.name}}MergeHeap) Less(i, j int) bool {
func (h {{$k.name}}MergeHeap) Len() int { return len(h.items) }
func (h {{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h {{$k.name}}MergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
@ -235,11 +233,11 @@ func (h {{.name}}MergeHeap) Less(i, j int) bool {
}
func (h *{{.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{.name}}MergeHeapItem))
func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
}
func (h *{{.name}}MergeHeap) Pop() interface{} {
func (h *{{$k.name}}MergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
@ -247,23 +245,23 @@ func (h *{{.name}}MergeHeap) Pop() interface{} {
return item
}
type {{.name}}MergeHeapItem struct {
itr *buf{{.Name}}Iterator
type {{$k.name}}MergeHeapItem struct {
itr *buf{{$k.Name}}Iterator
}
// {{.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{.name}}SortedMergeIterator struct {
inputs []{{.Name}}Iterator
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{$k.name}}SortedMergeIterator struct {
inputs []{{$k.Name}}Iterator
opt IteratorOptions
heap {{.name}}SortedMergeHeap
heap {{$k.name}}SortedMergeHeap
}
// new{{.Name}}SortedMergeIterator returns an instance of {{.name}}SortedMergeIterator.
func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{.name}}SortedMergeIterator{
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{$k.name}}SortedMergeIterator{
inputs: inputs,
heap: make({{.name}}SortedMergeHeap, 0, len(inputs)),
heap: make({{$k.name}}SortedMergeHeap, 0, len(inputs)),
opt: opt,
}
@ -276,7 +274,7 @@ func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOpt
}
// Append to the heap.
itr.heap = append(itr.heap, &{{.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
itr.heap = append(itr.heap, &{{$k.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
@ -284,7 +282,7 @@ func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOpt
}
// Close closes the underlying iterators.
func (itr *{{.name}}SortedMergeIterator) Close() error {
func (itr *{{$k.name}}SortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
@ -292,17 +290,17 @@ func (itr *{{.name}}SortedMergeIterator) Close() error {
}
// Next returns the next points from the iterator.
func (itr *{{.name}}SortedMergeIterator) Next() *{{.Name}}Point { return itr.pop() }
func (itr *{{$k.name}}SortedMergeIterator) Next() *{{$k.Name}}Point { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *{{.name}}SortedMergeIterator) pop() *{{.Name}}Point {
func (itr *{{$k.name}}SortedMergeIterator) pop() *{{$k.Name}}Point {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*{{.name}}SortedMergeHeapItem)
item := heap.Pop(&itr.heap).(*{{$k.name}}SortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
@ -315,12 +313,12 @@ func (itr *{{.name}}SortedMergeIterator) pop() *{{.Name}}Point {
return p
}
// {{.name}}SortedMergeHeap represents a heap of {{.name}}SortedMergeHeapItems.
type {{.name}}SortedMergeHeap []*{{.name}}SortedMergeHeapItem
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
type {{$k.name}}SortedMergeHeap []*{{$k.name}}SortedMergeHeapItem
func (h {{.name}}SortedMergeHeap) Len() int { return len(h) }
func (h {{.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h {{.name}}SortedMergeHeap) Less(i, j int) bool {
func (h {{$k.name}}SortedMergeHeap) Len() int { return len(h) }
func (h {{$k.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h {{$k.name}}SortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
@ -340,11 +338,11 @@ func (h {{.name}}SortedMergeHeap) Less(i, j int) bool {
return x.Time > y.Time
}
func (h *{{.name}}SortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*{{.name}}SortedMergeHeapItem))
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*{{$k.name}}SortedMergeHeapItem))
}
func (h *{{.name}}SortedMergeHeap) Pop() interface{} {
func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
@ -352,15 +350,15 @@ func (h *{{.name}}SortedMergeHeap) Pop() interface{} {
return item
}
type {{.name}}SortedMergeHeapItem struct {
point *{{.Name}}Point
itr {{.Name}}Iterator
type {{$k.name}}SortedMergeHeapItem struct {
point *{{$k.Name}}Point
itr {{$k.Name}}Iterator
ascending bool
}
// {{.name}}LimitIterator represents an iterator that limits points per group.
type {{.name}}LimitIterator struct {
input {{.Name}}Iterator
// {{$k.name}}LimitIterator represents an iterator that limits points per group.
type {{$k.name}}LimitIterator struct {
input {{$k.Name}}Iterator
opt IteratorOptions
n int
@ -370,19 +368,19 @@ type {{.name}}LimitIterator struct {
}
}
// new{{.Name}}LimitIterator returns a new instance of {{.name}}LimitIterator.
func new{{.Name}}LimitIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
return &{{$k.name}}LimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *{{.name}}LimitIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}LimitIterator) Next() *{{$k.Name}}Point {
for {
p := itr.input.Next()
if p == nil {
@ -417,9 +415,9 @@ func (itr *{{.name}}LimitIterator) Next() *{{.Name}}Point {
}
}
type {{.name}}FillIterator struct {
input *buf{{.Name}}Iterator
prev *{{.Name}}Point
type {{$k.name}}FillIterator struct {
input *buf{{$k.Name}}Iterator
prev *{{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
@ -433,11 +431,11 @@ type {{.name}}FillIterator struct {
}
}
func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOptions) *{{.name}}FillIterator {
func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = {{.Zero}}
opt.FillValue = {{$k.Zero}}
}
}
@ -455,8 +453,8 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOp
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &{{.name}}FillIterator{
input: newBuf{{.Name}}Iterator(input),
itr := &{{$k.name}}FillIterator{
input: newBuf{{$k.Name}}Iterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
@ -473,9 +471,9 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOp
return itr
}
func (itr *{{.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
@ -515,7 +513,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
itr.input.unread(p)
}
p = &{{.Name}}Point{
p = &{{$k.Name}}Point{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
@ -526,7 +524,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castTo{{.Name}}(itr.opt.FillValue)
p.Value = castTo{{$k.Name}}(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
@ -550,27 +548,27 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}AuxIterator represents a {{.name}} implementation of AuxIterator.
type {{.name}}AuxIterator struct {
input *buf{{.Name}}Iterator
output chan *{{.Name}}Point
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
}
func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{.name}}AuxIterator {
return &{{.name}}AuxIterator{
input: newBuf{{.Name}}Iterator(input),
output: make(chan *{{.Name}}Point, 1),
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
return &{{$k.name}}AuxIterator{
input: newBuf{{$k.Name}}Iterator(input),
output: make(chan *{{$k.Name}}Point, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *{{.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output }
func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
@ -584,15 +582,15 @@ func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator,
}
}
func (itr *{{.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) stream() {
func (itr *{{$k.name}}AuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
@ -609,32 +607,34 @@ func (itr *{{.name}}AuxIterator) stream() {
itr.fields.close()
}
// {{.name}}ChanIterator represents a new instance of {{.name}}ChanIterator.
type {{.name}}ChanIterator struct {
c chan *{{.Name}}Point
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
c chan *{{$k.Name}}Point
once sync.Once
}
func (itr *{{.name}}ChanIterator) Close() error {
func (itr *{{$k.name}}ChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *{{.name}}ChanIterator) Next() *{{.Name}}Point { return <-itr.c }
func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { return <-itr.c }
// {{.name}}ReduceIterator executes a reducer for every interval and buffers the result.
type {{.name}}ReduceIterator struct {
input *buf{{.Name}}Iterator
create func() ({{.Name}}PointAggregator, {{.Name}}PointEmitter)
{{range $v := $types}}
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
opt IteratorOptions
points []*{{.Name}}Point
points []*{{$v.Name}}Point
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}ReduceIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}ReduceIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
@ -649,22 +649,22 @@ func (itr *{{.name}}ReduceIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}ReducePoint stores the reduced data for a name/tag combination.
type {{.name}}ReducePoint struct {
// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
type {{$k.name}}Reduce{{$v.Name}}Point struct {
Name string
Tags Tags
Aggregator {{.Name}}PointAggregator
Emitter {{.Name}}PointEmitter
Aggregator {{$k.Name}}PointAggregator
Emitter {{$v.Name}}PointEmitter
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []*{{$v.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]*{{.name}}ReducePoint)
m := make(map[string]*{{$k.name}}Reduce{{.Name}}Point)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
@ -680,7 +680,7 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{.name}}ReducePoint{
rp = &{{$k.name}}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
@ -698,7 +698,7 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*{{.Name}}Point, len(m))
a := make([]*{{$v.Name}}Point, len(m))
for i, k := range keys {
rp := m[k]
p := rp.Emitter.Emit()
@ -710,20 +710,21 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
return a
}
{{end}}
// {{.name}}ReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type {{.name}}ReduceSliceIterator struct {
input *buf{{.Name}}Iterator
fn {{.name}}ReduceSliceFunc
// {{$k.name}}ReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type {{$k.name}}ReduceSliceIterator struct {
input *buf{{$k.Name}}Iterator
fn {{$k.name}}ReduceSliceFunc
opt IteratorOptions
points []{{.Name}}Point
points []{{$k.Name}}Point
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}ReduceSliceIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}ReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}ReduceSliceIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}ReduceSliceIterator) Next() *{{$k.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
@ -740,7 +741,7 @@ func (itr *{{.name}}ReduceSliceIterator) Next() *{{.Name}}Point {
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
func (itr *{{$k.name}}ReduceSliceIterator) reduce() []{{$k.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
@ -753,7 +754,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
groups := make(map[string]struct {
name string
tags Tags
points []{{.Name}}Point
points []{{$k.Name}}Point
})
for {
// Read next point.
@ -775,7 +776,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
}
// Reduce each set into a set of values.
results := make(map[string][]{{.Name}}Point)
results := make(map[string][]{{$k.Name}}Point)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
@ -798,7 +799,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]{{.Name}}Point, 0, len(results))
a := make([]{{$k.Name}}Point, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
@ -808,21 +809,21 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
return a
}
// {{.name}}ReduceSliceFunc is the function called by a {{.Name}}Point slice reducer.
type {{.name}}ReduceSliceFunc func(a []{{.Name}}Point, opt *reduceOptions) []{{.Name}}Point
// {{$k.name}}ReduceSliceFunc is the function called by a {{$k.Name}}Point slice reducer.
type {{$k.name}}ReduceSliceFunc func(a []{{$k.Name}}Point, opt *reduceOptions) []{{$k.Name}}Point
// {{.name}}ReduceIterator executes a function to modify an existing point for every
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type {{.name}}TransformIterator struct {
input {{.Name}}Iterator
fn {{.name}}TransformFunc
type {{$k.name}}TransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}TransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}TransformIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}TransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}TransformIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}TransformIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
@ -830,23 +831,23 @@ func (itr *{{.name}}TransformIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}TransformFunc creates or modifies a point.
// {{$k.name}}TransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{.name}}TransformFunc func(p *{{.Name}}Point) *{{.Name}}Point
type {{$k.name}}TransformFunc func(p *{{$k.Name}}Point) *{{$k.Name}}Point
// {{.name}}ReduceIterator executes a function to modify an existing point for every
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type {{.name}}BoolTransformIterator struct {
input {{.Name}}Iterator
fn {{.name}}BoolTransformFunc
type {{$k.name}}BoolTransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}BoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}BoolTransformIterator) Next() *BooleanPoint {
func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
@ -854,33 +855,33 @@ func (itr *{{.name}}BoolTransformIterator) Next() *BooleanPoint {
return nil
}
// {{.name}}BoolTransformFunc creates or modifies a point.
// {{$k.name}}BoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{.name}}BoolTransformFunc func(p *{{.Name}}Point) *BooleanPoint
type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
// {{.name}}DedupeIterator only outputs unique points.
// {{$k.name}}DedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type {{.name}}DedupeIterator struct {
input {{.Name}}Iterator
type {{$k.name}}DedupeIterator struct {
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
}
// new{{.Name}}DedupeIterator returns a new instance of {{.name}}DedupeIterator.
func new{{.Name}}DedupeIterator(input {{.Name}}Iterator) *{{.name}}DedupeIterator {
return &{{.name}}DedupeIterator{
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}DedupeIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}DedupeIterator) Next() *{{$k.Name}}Point {
for {
// Read next point.
p := itr.input.Next()
@ -889,7 +890,7 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encode{{.Name}}Point(p))
buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
@ -906,24 +907,24 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
}
}
// {{.name}}ReaderIterator represents an iterator that streams from a reader.
type {{.name}}ReaderIterator struct {
// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
type {{$k.name}}ReaderIterator struct {
r io.Reader
dec *{{.Name}}PointDecoder
first *{{.Name}}Point
dec *{{$k.Name}}PointDecoder
first *{{$k.Name}}Point
}
// new{{.Name}}ReaderIterator returns a new instance of {{.name}}ReaderIterator.
func new{{.Name}}ReaderIterator(r io.Reader, first *{{.Name}}Point) *{{.name}}ReaderIterator {
return &{{.name}}ReaderIterator{
// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
func new{{$k.Name}}ReaderIterator(r io.Reader, first *{{$k.Name}}Point) *{{$k.name}}ReaderIterator {
return &{{$k.name}}ReaderIterator{
r: r,
dec: New{{.Name}}PointDecoder(r),
dec: New{{$k.Name}}PointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *{{.name}}ReaderIterator) Close() error {
func (itr *{{$k.name}}ReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
@ -931,7 +932,7 @@ func (itr *{{.name}}ReaderIterator) Close() error {
}
// Next returns the next point from the iterator.
func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}ReaderIterator) Next() *{{$k.Name}}Point {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
@ -942,8 +943,8 @@ func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point {
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &{{.Name}}Point{}
if err := itr.dec.Decode{{.Name}}Point(p); err == io.EOF {
p := &{{$k.Name}}Point{}
if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
@ -998,4 +999,4 @@ func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error
}
}
{{end}}
{{end}}{{end}}