Merge pull request #5895 from influxdata/js-5886-more-aggregate-iterator-types

Support all iterator types for count(), first(), and last()
pull/5901/head
Jonathan A. Sternberg 2016-03-03 15:23:57 -05:00
commit caee9a475c
6 changed files with 1959 additions and 392 deletions

View File

@ -62,24 +62,36 @@ func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatFuncReducer(floatCountReduce)
createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
fn := NewFloatFuncIntegerReducer(floatCountReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceIntegerIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerCountReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, IntegerPointEmitter) {
fn := NewStringFuncIntegerReducer(stringCountReduce)
return fn, fn
}
return &stringReduceIntegerIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
fn := NewBooleanFuncIntegerReducer(booleanCountReduce)
return fn, fn
}
return &booleanReduceIntegerIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported count iterator type: %T", input)
}
}
// floatCountReduce returns the count of points.
func floatCountReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
func floatCountReduce(prev *IntegerPoint, curr *FloatPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
@ -94,6 +106,22 @@ func integerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
return ZeroTime, prev.Value + 1, nil
}
// stringCountReduce returns the count of points.
func stringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
return ZeroTime, prev.Value + 1, nil
}
// booleanCountReduce returns the count of points.
func booleanCountReduce(prev *IntegerPoint, curr *BooleanPoint) (int64, int64, []interface{}) {
if prev == nil {
return ZeroTime, 1, nil
}
return ZeroTime, prev.Value + 1, nil
}
// newMinIterator returns an iterator for operating on a min() call.
func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -102,13 +130,13 @@ func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatMinReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerMinReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported min iterator type: %T", input)
}
@ -138,13 +166,13 @@ func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatMaxReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerMaxReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported max iterator type: %T", input)
}
@ -174,13 +202,13 @@ func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatSumReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerSumReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
}
@ -210,13 +238,25 @@ func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatFirstReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerFirstReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringFuncReducer(stringFirstReduce)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanFuncReducer(booleanFirstReduce)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported first iterator type: %T", input)
}
@ -238,6 +278,22 @@ func integerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
return prev.Time, prev.Value, prev.Aux
}
// stringFirstReduce returns the first point sorted by time.
func stringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// booleanFirstReduce returns the first point sorted by time.
func booleanFirstReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && !curr.Value && prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// newLastIterator returns an iterator for operating on a last() call.
func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -246,13 +302,25 @@ func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatFuncReducer(floatLastReduce)
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerFuncReducer(integerLastReduce)
return fn, fn
}
return &integerReduceIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringFuncReducer(stringLastReduce)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanFuncReducer(booleanLastReduce)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported last iterator type: %T", input)
}
@ -274,6 +342,22 @@ func integerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
return prev.Time, prev.Value, prev.Aux
}
// stringLastReduce returns the first point sorted by time.
func stringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// booleanLastReduce returns the first point sorted by time.
func booleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value && !prev.Value) {
return curr.Time, curr.Value, curr.Aux
}
return prev.Time, prev.Value, prev.Aux
}
// NewDistinctIterator returns an iterator for operating on a distinct() call.
func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
@ -347,7 +431,7 @@ func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
fn := NewFloatMeanReducer()
return fn, fn
}
return &floatReduceIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerMeanReducer()
@ -928,94 +1012,6 @@ func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool)
}
}
type integerReduceFloatIterator struct {
input *bufIntegerIterator
create func() (IntegerPointAggregator, FloatPointEmitter)
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceFloatIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceFloatIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// floatReduceIntegerPoint stores the reduced data for a name/tag combination.
type integerReduceFloatPoint struct {
Name string
Tags Tags
Aggregator IntegerPointAggregator
Emitter FloatPointEmitter
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceFloatIterator) reduce() []*FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]*integerReduceFloatPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &integerReduceFloatPoint{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
m[id] = rp
}
rp.Aggregator.Aggregate(curr)
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
rp := m[k]
p := rp.Emitter.Emit()
p.Name = rp.Name
p.Tags = rp.Tags
p.Time = startTime
a[i] = p
}
return a
}
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {

View File

@ -32,11 +32,11 @@ func TestCallIterator_Count_Float(t *testing.T) {
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
@ -74,6 +74,70 @@ func TestCallIterator_Count_Integer(t *testing.T) {
}
}
// Ensure that a string iterator can be created for a count() call.
func TestCallIterator_Count_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Name: "cpu", Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 5, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: "b", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a count() call.
func TestCallIterator_Count_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Name: "cpu", Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "cpu", Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostA")},
{Name: "cpu", Time: 5, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Name: "cpu", Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
{Name: "mem", Time: 23, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`count("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a float iterator can be created for a min() call.
func TestCallIterator_Min_Float(t *testing.T) {
itr, _ := influxql.NewCallIterator(
@ -276,12 +340,12 @@ func TestCallIterator_First_Float(t *testing.T) {
},
)
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -306,12 +370,72 @@ func TestCallIterator_First_Integer(t *testing.T) {
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a string iterator can be created for a first() call.
func TestCallIterator_First_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`first("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.StringPoint{Time: 0, Value: "d", Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.StringPoint{Time: 0, Value: "c", Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.StringPoint{Time: 5, Value: "e", Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.StringPoint{Time: 20, Value: "a", Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a first() call.
func TestCallIterator_First_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`first("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 20, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -336,12 +460,12 @@ func TestCallIterator_Last_Float(t *testing.T) {
},
)
if a, ok := CompareFloatIterator(itr, []influxql.FloatPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.FloatPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
@ -366,12 +490,72 @@ func TestCallIterator_Last_Integer(t *testing.T) {
},
)
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3},
{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1},
{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1},
{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.IntegerPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.IntegerPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a string iterator can be created for a last() call.
func TestCallIterator_Last_String(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&StringIterator{Points: []influxql.StringPoint{
{Time: 1, Value: "c", Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: "b", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: "d", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: "b", Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: "e", Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: "a", Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`last("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.StringPoint{Time: 0, Value: "b", Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.StringPoint{Time: 0, Value: "c", Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.StringPoint{Time: 5, Value: "e", Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.StringPoint{Time: 20, Value: "a", Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure that a boolean iterator can be created for a last() call.
func TestCallIterator_Last_Boolean(t *testing.T) {
itr, _ := influxql.NewCallIterator(
&BooleanIterator{Points: []influxql.BooleanPoint{
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostA")},
{Time: 6, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
{Time: 23, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
}},
influxql.IteratorOptions{
Expr: MustParseExpr(`last("value")`),
Dimensions: []string{"host"},
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
},
)
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.BooleanPoint{Time: 0, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 3}},
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
{&influxql.BooleanPoint{Time: 20, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

View File

@ -17,7 +17,7 @@ type FloatPointEmitter interface {
}
// FloatReduceFunc is the function called by a FloatPoint reducer.
type FloatReduceFunc func(prev, curr *FloatPoint) (t int64, v float64, aux []interface{})
type FloatReduceFunc func(prev *FloatPoint, curr *FloatPoint) (t int64, v float64, aux []interface{})
type FloatFuncReducer struct {
prev *FloatPoint
@ -43,6 +43,87 @@ func (r *FloatFuncReducer) Emit() *FloatPoint {
return r.prev
}
// FloatReduceIntegerFunc is the function called by a FloatPoint reducer.
type FloatReduceIntegerFunc func(prev *IntegerPoint, curr *FloatPoint) (t int64, v int64, aux []interface{})
type FloatFuncIntegerReducer struct {
prev *IntegerPoint
fn FloatReduceIntegerFunc
}
func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc) *FloatFuncIntegerReducer {
return &FloatFuncIntegerReducer{fn: fn}
}
func (r *FloatFuncIntegerReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// FloatReduceStringFunc is the function called by a FloatPoint reducer.
type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{})
type FloatFuncStringReducer struct {
prev *StringPoint
fn FloatReduceStringFunc
}
func NewFloatFuncStringReducer(fn FloatReduceStringFunc) *FloatFuncStringReducer {
return &FloatFuncStringReducer{fn: fn}
}
func (r *FloatFuncStringReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// FloatReduceBooleanFunc is the function called by a FloatPoint reducer.
type FloatReduceBooleanFunc func(prev *BooleanPoint, curr *FloatPoint) (t int64, v bool, aux []interface{})
type FloatFuncBooleanReducer struct {
prev *BooleanPoint
fn FloatReduceBooleanFunc
}
func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc) *FloatFuncBooleanReducer {
return &FloatFuncBooleanReducer{fn: fn}
}
func (r *FloatFuncBooleanReducer) Aggregate(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *FloatFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
Aggregate(p *IntegerPoint)
@ -53,8 +134,35 @@ type IntegerPointEmitter interface {
Emit() *IntegerPoint
}
// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFloatFunc func(prev *FloatPoint, curr *IntegerPoint) (t int64, v float64, aux []interface{})
type IntegerFuncFloatReducer struct {
prev *FloatPoint
fn IntegerReduceFloatFunc
}
func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc) *IntegerFuncFloatReducer {
return &IntegerFuncFloatReducer{fn: fn}
}
func (r *IntegerFuncFloatReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// IntegerReduceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFunc func(prev, curr *IntegerPoint) (t int64, v int64, aux []interface{})
type IntegerReduceFunc func(prev *IntegerPoint, curr *IntegerPoint) (t int64, v int64, aux []interface{})
type IntegerFuncReducer struct {
prev *IntegerPoint
@ -80,6 +188,60 @@ func (r *IntegerFuncReducer) Emit() *IntegerPoint {
return r.prev
}
// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{})
type IntegerFuncStringReducer struct {
prev *StringPoint
fn IntegerReduceStringFunc
}
func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc) *IntegerFuncStringReducer {
return &IntegerFuncStringReducer{fn: fn}
}
func (r *IntegerFuncStringReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer.
type IntegerReduceBooleanFunc func(prev *BooleanPoint, curr *IntegerPoint) (t int64, v bool, aux []interface{})
type IntegerFuncBooleanReducer struct {
prev *BooleanPoint
fn IntegerReduceBooleanFunc
}
func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc) *IntegerFuncBooleanReducer {
return &IntegerFuncBooleanReducer{fn: fn}
}
func (r *IntegerFuncBooleanReducer) Aggregate(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *IntegerFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
Aggregate(p *StringPoint)
@ -90,8 +252,62 @@ type StringPointEmitter interface {
Emit() *StringPoint
}
// StringReduceFloatFunc is the function called by a StringPoint reducer.
type StringReduceFloatFunc func(prev *FloatPoint, curr *StringPoint) (t int64, v float64, aux []interface{})
type StringFuncFloatReducer struct {
prev *FloatPoint
fn StringReduceFloatFunc
}
func NewStringFuncFloatReducer(fn StringReduceFloatFunc) *StringFuncFloatReducer {
return &StringFuncFloatReducer{fn: fn}
}
func (r *StringFuncFloatReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// StringReduceIntegerFunc is the function called by a StringPoint reducer.
type StringReduceIntegerFunc func(prev *IntegerPoint, curr *StringPoint) (t int64, v int64, aux []interface{})
type StringFuncIntegerReducer struct {
prev *IntegerPoint
fn StringReduceIntegerFunc
}
func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc) *StringFuncIntegerReducer {
return &StringFuncIntegerReducer{fn: fn}
}
func (r *StringFuncIntegerReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// StringReduceFunc is the function called by a StringPoint reducer.
type StringReduceFunc func(prev, curr *StringPoint) (t int64, v string, aux []interface{})
type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{})
type StringFuncReducer struct {
prev *StringPoint
@ -117,6 +333,33 @@ func (r *StringFuncReducer) Emit() *StringPoint {
return r.prev
}
// StringReduceBooleanFunc is the function called by a StringPoint reducer.
type StringReduceBooleanFunc func(prev *BooleanPoint, curr *StringPoint) (t int64, v bool, aux []interface{})
type StringFuncBooleanReducer struct {
prev *BooleanPoint
fn StringReduceBooleanFunc
}
func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc) *StringFuncBooleanReducer {
return &StringFuncBooleanReducer{fn: fn}
}
func (r *StringFuncBooleanReducer) Aggregate(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *StringFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
}
// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
Aggregate(p *BooleanPoint)
@ -127,8 +370,89 @@ type BooleanPointEmitter interface {
Emit() *BooleanPoint
}
// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFloatFunc func(prev *FloatPoint, curr *BooleanPoint) (t int64, v float64, aux []interface{})
type BooleanFuncFloatReducer struct {
prev *FloatPoint
fn BooleanReduceFloatFunc
}
func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc) *BooleanFuncFloatReducer {
return &BooleanFuncFloatReducer{fn: fn}
}
func (r *BooleanFuncFloatReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncFloatReducer) Emit() *FloatPoint {
return r.prev
}
// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer.
type BooleanReduceIntegerFunc func(prev *IntegerPoint, curr *BooleanPoint) (t int64, v int64, aux []interface{})
type BooleanFuncIntegerReducer struct {
prev *IntegerPoint
fn BooleanReduceIntegerFunc
}
func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc) *BooleanFuncIntegerReducer {
return &BooleanFuncIntegerReducer{fn: fn}
}
func (r *BooleanFuncIntegerReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
}
// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{})
type BooleanFuncStringReducer struct {
prev *StringPoint
fn BooleanReduceStringFunc
}
func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc) *BooleanFuncStringReducer {
return &BooleanFuncStringReducer{fn: fn}
}
func (r *BooleanFuncStringReducer) Aggregate(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
}
func (r *BooleanFuncStringReducer) Emit() *StringPoint {
return r.prev
}
// BooleanReduceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFunc func(prev, curr *BooleanPoint) (t int64, v bool, aux []interface{})
type BooleanReduceFunc func(prev *BooleanPoint, curr *BooleanPoint) (t int64, v bool, aux []interface{})
type BooleanFuncReducer struct {
prev *BooleanPoint

View File

@ -1,33 +1,35 @@
package influxql
{{range .}}
{{with $types := .}}{{range $k := $types}}
// {{.Name}}PointAggregator aggregates points to produce a single point.
type {{.Name}}PointAggregator interface {
// {{$k.Name}}PointAggregator aggregates points to produce a single point.
type {{$k.Name}}PointAggregator interface {
Aggregate(p *{{.Name}}Point)
}
// {{.Name}}PointEmitter produces a single point from an aggregate.
type {{.Name}}PointEmitter interface {
Emit() *{{.Name}}Point
// {{$k.Name}}PointEmitter produces a single point from an aggregate.
type {{$k.Name}}PointEmitter interface {
Emit() *{{$k.Name}}Point
}
// {{.Name}}ReduceFunc is the function called by a {{.Name}}Point reducer.
type {{.Name}}ReduceFunc func(prev, curr *{{.Name}}Point) (t int64, v {{.Type}}, aux []interface{})
{{range $v := $types}}
type {{.Name}}FuncReducer struct {
prev *{{.Name}}Point
fn {{.Name}}ReduceFunc
// {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func is the function called by a {{$k.Name}}Point reducer.
type {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func func(prev *{{$v.Name}}Point, curr *{{$k.Name}}Point) (t int64, v {{$v.Type}}, aux []interface{})
type {{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer struct {
prev *{{$v.Name}}Point
fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func
}
func New{{.Name}}FuncReducer(fn {{.Name}}ReduceFunc) *{{.Name}}FuncReducer {
return &{{.Name}}FuncReducer{fn: fn}
func New{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func) *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer {
return &{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn}
}
func (r *{{.Name}}FuncReducer) Aggregate(p *{{.Name}}Point) {
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate(p *{{$k.Name}}Point) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &{{.Name}}Point{}
r.prev = &{{$v.Name}}Point{}
}
r.prev.Time = t
r.prev.Value = v
@ -35,7 +37,7 @@ func (r *{{.Name}}FuncReducer) Aggregate(p *{{.Name}}Point) {
r.prev.Aggregated++
}
func (r *{{.Name}}FuncReducer) Emit() *{{.Name}}Point {
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() *{{$v.Name}}Point {
return r.prev
}
{{end}}
{{end}}{{end}}{{end}}

File diff suppressed because it is too large Load Diff

View File

@ -12,22 +12,22 @@ import (
"github.com/gogo/protobuf/proto"
)
{{range .}}
{{with $types := .}}{{range $k := $types}}
// {{.Name}}Iterator represents a stream of {{.name}} points.
type {{.Name}}Iterator interface {
// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
type {{$k.Name}}Iterator interface {
Iterator
Next() *{{.Name}}Point
Next() *{{$k.Name}}Point
}
// new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{.Name}}Iterator and cannot
// be cast to a {{.Name}}Iterator.
func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
a := make([]{{.Name}}Iterator, 0, len(itrs))
// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
// be cast to a {{$k.Name}}Iterator.
func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
a := make([]{{$k.Name}}Iterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case {{.Name}}Iterator:
case {{$k.Name}}Iterator:
a = append(a, itr)
{{if eq .Name "Float"}}
case IntegerIterator:
@ -41,24 +41,22 @@ func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
}
// buf{{.Name}}Iterator represents a buffered {{.Name}}Iterator.
type buf{{.Name}}Iterator struct {
itr {{.Name}}Iterator
buf *{{.Name}}Point
// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
type buf{{$k.Name}}Iterator struct {
itr {{$k.Name}}Iterator
buf *{{$k.Name}}Point
}
// newBuf{{.Name}}Iterator returns a buffered {{.Name}}Iterator.
func newBuf{{.Name}}Iterator(itr {{.Name}}Iterator) *buf{{.Name}}Iterator {
return &buf{{.Name}}Iterator{
itr: itr,
}
// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
return &buf{{$k.Name}}Iterator{itr: itr}
}
// Close closes the underlying iterator.
func (itr *buf{{.Name}}Iterator) Close() error { return itr.itr.Close() }
func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *buf{{.Name}}Iterator) peek() *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) peek() *{{$k.Name}}Point {
p := itr.Next()
itr.unread(p)
return p
@ -66,7 +64,7 @@ func (itr *buf{{.Name}}Iterator) peek() *{{.Name}}Point {
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *buf{{.Name}}Iterator) peekTime() int64 {
func (itr *buf{{$k.Name}}Iterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
@ -75,7 +73,7 @@ func (itr *buf{{.Name}}Iterator) peekTime() int64 {
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *buf{{.Name}}Iterator) Next() *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) Next() *{{$k.Name}}Point {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
@ -86,7 +84,7 @@ func (itr *buf{{.Name}}Iterator) Next() *{{.Name}}Point {
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *buf{{.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{.Name}}Point {
func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{$k.Name}}Point {
v := itr.Next()
if v == nil {
return nil
@ -98,15 +96,15 @@ func (itr *buf{{.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{.Name
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *buf{{.Name}}Iterator) unread(v *{{.Name}}Point) { itr.buf = v }
func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
// {{.name}}MergeIterator represents an iterator that combines multiple {{.name}} iterators.
type {{.name}}MergeIterator struct {
inputs []{{.Name}}Iterator
heap *{{.name}}MergeHeap
// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
type {{$k.name}}MergeIterator struct {
inputs []{{$k.Name}}Iterator
heap *{{$k.name}}MergeHeap
// Current iterator and window.
curr *{{.name}}MergeHeapItem
curr *{{$k.name}}MergeHeapItem
window struct {
name string
tags string
@ -115,12 +113,12 @@ type {{.name}}MergeIterator struct {
}
}
// new{{.Name}}MergeIterator returns a new instance of {{.name}}MergeIterator.
func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions) *{{.name}}MergeIterator {
itr := &{{.name}}MergeIterator{
// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
itr := &{{$k.name}}MergeIterator{
inputs: inputs,
heap: &{{.name}}MergeHeap{
items: make([]*{{.name}}MergeHeapItem, 0, len(inputs)),
heap: &{{$k.name}}MergeHeap{
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
@ -128,13 +126,13 @@ func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions)
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBuf{{.Name}}Iterator(input)
bufInput := newBuf{{$k.Name}}Iterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &{{.name}}MergeHeapItem{itr: bufInput})
itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
@ -142,7 +140,7 @@ func new{{.Name}}MergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions)
}
// Close closes the underlying iterators.
func (itr *{{.name}}MergeIterator) Close() error {
func (itr *{{$k.name}}MergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
@ -150,14 +148,14 @@ func (itr *{{.name}}MergeIterator) Close() error {
}
// Next returns the next point from the iterator.
func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}MergeIterator) Next() *{{$k.Name}}Point {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*{{.name}}MergeHeapItem)
itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
@ -199,16 +197,16 @@ func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
}
}
// {{.name}}MergeHeap represents a heap of {{.name}}MergeHeapItems.
// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type {{.name}}MergeHeap struct {
type {{$k.name}}MergeHeap struct {
opt IteratorOptions
items []*{{.name}}MergeHeapItem
items []*{{$k.name}}MergeHeapItem
}
func (h {{.name}}MergeHeap) Len() int { return len(h.items) }
func (h {{.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h {{.name}}MergeHeap) Less(i, j int) bool {
func (h {{$k.name}}MergeHeap) Len() int { return len(h.items) }
func (h {{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h {{$k.name}}MergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
@ -235,11 +233,11 @@ func (h {{.name}}MergeHeap) Less(i, j int) bool {
}
func (h *{{.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{.name}}MergeHeapItem))
func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
}
func (h *{{.name}}MergeHeap) Pop() interface{} {
func (h *{{$k.name}}MergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
@ -247,23 +245,23 @@ func (h *{{.name}}MergeHeap) Pop() interface{} {
return item
}
type {{.name}}MergeHeapItem struct {
itr *buf{{.Name}}Iterator
type {{$k.name}}MergeHeapItem struct {
itr *buf{{$k.Name}}Iterator
}
// {{.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{.name}}SortedMergeIterator struct {
inputs []{{.Name}}Iterator
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{$k.name}}SortedMergeIterator struct {
inputs []{{$k.Name}}Iterator
opt IteratorOptions
heap {{.name}}SortedMergeHeap
heap {{$k.name}}SortedMergeHeap
}
// new{{.Name}}SortedMergeIterator returns an instance of {{.name}}SortedMergeIterator.
func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{.name}}SortedMergeIterator{
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{$k.name}}SortedMergeIterator{
inputs: inputs,
heap: make({{.name}}SortedMergeHeap, 0, len(inputs)),
heap: make({{$k.name}}SortedMergeHeap, 0, len(inputs)),
opt: opt,
}
@ -276,7 +274,7 @@ func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOpt
}
// Append to the heap.
itr.heap = append(itr.heap, &{{.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
itr.heap = append(itr.heap, &{{$k.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
@ -284,7 +282,7 @@ func new{{.Name}}SortedMergeIterator(inputs []{{.Name}}Iterator, opt IteratorOpt
}
// Close closes the underlying iterators.
func (itr *{{.name}}SortedMergeIterator) Close() error {
func (itr *{{$k.name}}SortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
@ -292,17 +290,17 @@ func (itr *{{.name}}SortedMergeIterator) Close() error {
}
// Next returns the next points from the iterator.
func (itr *{{.name}}SortedMergeIterator) Next() *{{.Name}}Point { return itr.pop() }
func (itr *{{$k.name}}SortedMergeIterator) Next() *{{$k.Name}}Point { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *{{.name}}SortedMergeIterator) pop() *{{.Name}}Point {
func (itr *{{$k.name}}SortedMergeIterator) pop() *{{$k.Name}}Point {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*{{.name}}SortedMergeHeapItem)
item := heap.Pop(&itr.heap).(*{{$k.name}}SortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
@ -315,12 +313,12 @@ func (itr *{{.name}}SortedMergeIterator) pop() *{{.Name}}Point {
return p
}
// {{.name}}SortedMergeHeap represents a heap of {{.name}}SortedMergeHeapItems.
type {{.name}}SortedMergeHeap []*{{.name}}SortedMergeHeapItem
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
type {{$k.name}}SortedMergeHeap []*{{$k.name}}SortedMergeHeapItem
func (h {{.name}}SortedMergeHeap) Len() int { return len(h) }
func (h {{.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h {{.name}}SortedMergeHeap) Less(i, j int) bool {
func (h {{$k.name}}SortedMergeHeap) Len() int { return len(h) }
func (h {{$k.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h {{$k.name}}SortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
@ -340,11 +338,11 @@ func (h {{.name}}SortedMergeHeap) Less(i, j int) bool {
return x.Time > y.Time
}
func (h *{{.name}}SortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*{{.name}}SortedMergeHeapItem))
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*{{$k.name}}SortedMergeHeapItem))
}
func (h *{{.name}}SortedMergeHeap) Pop() interface{} {
func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
@ -352,15 +350,15 @@ func (h *{{.name}}SortedMergeHeap) Pop() interface{} {
return item
}
type {{.name}}SortedMergeHeapItem struct {
point *{{.Name}}Point
itr {{.Name}}Iterator
type {{$k.name}}SortedMergeHeapItem struct {
point *{{$k.Name}}Point
itr {{$k.Name}}Iterator
ascending bool
}
// {{.name}}LimitIterator represents an iterator that limits points per group.
type {{.name}}LimitIterator struct {
input {{.Name}}Iterator
// {{$k.name}}LimitIterator represents an iterator that limits points per group.
type {{$k.name}}LimitIterator struct {
input {{$k.Name}}Iterator
opt IteratorOptions
n int
@ -370,19 +368,19 @@ type {{.name}}LimitIterator struct {
}
}
// new{{.Name}}LimitIterator returns a new instance of {{.name}}LimitIterator.
func new{{.Name}}LimitIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
return &{{$k.name}}LimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *{{.name}}LimitIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}LimitIterator) Next() *{{$k.Name}}Point {
for {
p := itr.input.Next()
if p == nil {
@ -417,9 +415,9 @@ func (itr *{{.name}}LimitIterator) Next() *{{.Name}}Point {
}
}
type {{.name}}FillIterator struct {
input *buf{{.Name}}Iterator
prev *{{.Name}}Point
type {{$k.name}}FillIterator struct {
input *buf{{$k.Name}}Iterator
prev *{{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
@ -433,11 +431,11 @@ type {{.name}}FillIterator struct {
}
}
func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOptions) *{{.name}}FillIterator {
func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = {{.Zero}}
opt.FillValue = {{$k.Zero}}
}
}
@ -455,8 +453,8 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOp
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &{{.name}}FillIterator{
input: newBuf{{.Name}}Iterator(input),
itr := &{{$k.name}}FillIterator{
input: newBuf{{$k.Name}}Iterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
@ -473,9 +471,9 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, expr Expr, opt IteratorOp
return itr
}
func (itr *{{.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
@ -515,7 +513,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
itr.input.unread(p)
}
p = &{{.Name}}Point{
p = &{{$k.Name}}Point{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
@ -526,7 +524,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castTo{{.Name}}(itr.opt.FillValue)
p.Value = castTo{{$k.Name}}(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
@ -550,27 +548,27 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}AuxIterator represents a {{.name}} implementation of AuxIterator.
type {{.name}}AuxIterator struct {
input *buf{{.Name}}Iterator
output chan *{{.Name}}Point
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
}
func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{.name}}AuxIterator {
return &{{.name}}AuxIterator{
input: newBuf{{.Name}}Iterator(input),
output: make(chan *{{.Name}}Point, 1),
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
return &{{$k.name}}AuxIterator{
input: newBuf{{$k.Name}}Iterator(input),
output: make(chan *{{$k.Name}}Point, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *{{.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output }
func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
@ -584,15 +582,15 @@ func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator,
}
}
func (itr *{{.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) stream() {
func (itr *{{$k.name}}AuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
@ -609,32 +607,34 @@ func (itr *{{.name}}AuxIterator) stream() {
itr.fields.close()
}
// {{.name}}ChanIterator represents a new instance of {{.name}}ChanIterator.
type {{.name}}ChanIterator struct {
c chan *{{.Name}}Point
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
c chan *{{$k.Name}}Point
once sync.Once
}
func (itr *{{.name}}ChanIterator) Close() error {
func (itr *{{$k.name}}ChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *{{.name}}ChanIterator) Next() *{{.Name}}Point { return <-itr.c }
func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { return <-itr.c }
// {{.name}}ReduceIterator executes a reducer for every interval and buffers the result.
type {{.name}}ReduceIterator struct {
input *buf{{.Name}}Iterator
create func() ({{.Name}}PointAggregator, {{.Name}}PointEmitter)
{{range $v := $types}}
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
opt IteratorOptions
points []*{{.Name}}Point
points []*{{$v.Name}}Point
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}ReduceIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}ReduceIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
@ -649,22 +649,22 @@ func (itr *{{.name}}ReduceIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}ReducePoint stores the reduced data for a name/tag combination.
type {{.name}}ReducePoint struct {
// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
type {{$k.name}}Reduce{{$v.Name}}Point struct {
Name string
Tags Tags
Aggregator {{.Name}}PointAggregator
Emitter {{.Name}}PointEmitter
Aggregator {{$k.Name}}PointAggregator
Emitter {{$v.Name}}PointEmitter
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []*{{$v.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]*{{.name}}ReducePoint)
m := make(map[string]*{{$k.name}}Reduce{{.Name}}Point)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
@ -680,7 +680,7 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{.name}}ReducePoint{
rp = &{{$k.name}}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
@ -698,7 +698,7 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*{{.Name}}Point, len(m))
a := make([]*{{$v.Name}}Point, len(m))
for i, k := range keys {
rp := m[k]
p := rp.Emitter.Emit()
@ -710,20 +710,21 @@ func (itr *{{.name}}ReduceIterator) reduce() []*{{.Name}}Point {
return a
}
{{end}}
// {{.name}}ReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type {{.name}}ReduceSliceIterator struct {
input *buf{{.Name}}Iterator
fn {{.name}}ReduceSliceFunc
// {{$k.name}}ReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type {{$k.name}}ReduceSliceIterator struct {
input *buf{{$k.Name}}Iterator
fn {{$k.name}}ReduceSliceFunc
opt IteratorOptions
points []{{.Name}}Point
points []{{$k.Name}}Point
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}ReduceSliceIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}ReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}ReduceSliceIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}ReduceSliceIterator) Next() *{{$k.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
@ -740,7 +741,7 @@ func (itr *{{.name}}ReduceSliceIterator) Next() *{{.Name}}Point {
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
func (itr *{{$k.name}}ReduceSliceIterator) reduce() []{{$k.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
@ -753,7 +754,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
groups := make(map[string]struct {
name string
tags Tags
points []{{.Name}}Point
points []{{$k.Name}}Point
})
for {
// Read next point.
@ -775,7 +776,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
}
// Reduce each set into a set of values.
results := make(map[string][]{{.Name}}Point)
results := make(map[string][]{{$k.Name}}Point)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
@ -798,7 +799,7 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]{{.Name}}Point, 0, len(results))
a := make([]{{$k.Name}}Point, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
@ -808,21 +809,21 @@ func (itr *{{.name}}ReduceSliceIterator) reduce() []{{.Name}}Point {
return a
}
// {{.name}}ReduceSliceFunc is the function called by a {{.Name}}Point slice reducer.
type {{.name}}ReduceSliceFunc func(a []{{.Name}}Point, opt *reduceOptions) []{{.Name}}Point
// {{$k.name}}ReduceSliceFunc is the function called by a {{$k.Name}}Point slice reducer.
type {{$k.name}}ReduceSliceFunc func(a []{{$k.Name}}Point, opt *reduceOptions) []{{$k.Name}}Point
// {{.name}}ReduceIterator executes a function to modify an existing point for every
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type {{.name}}TransformIterator struct {
input {{.Name}}Iterator
fn {{.name}}TransformFunc
type {{$k.name}}TransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}TransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}TransformIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}TransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}TransformIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}TransformIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
@ -830,23 +831,23 @@ func (itr *{{.name}}TransformIterator) Next() *{{.Name}}Point {
return p
}
// {{.name}}TransformFunc creates or modifies a point.
// {{$k.name}}TransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{.name}}TransformFunc func(p *{{.Name}}Point) *{{.Name}}Point
type {{$k.name}}TransformFunc func(p *{{$k.Name}}Point) *{{$k.Name}}Point
// {{.name}}ReduceIterator executes a function to modify an existing point for every
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type {{.name}}BoolTransformIterator struct {
input {{.Name}}Iterator
fn {{.name}}BoolTransformFunc
type {{$k.name}}BoolTransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}BoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{.name}}BoolTransformIterator) Next() *BooleanPoint {
func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
@ -854,33 +855,33 @@ func (itr *{{.name}}BoolTransformIterator) Next() *BooleanPoint {
return nil
}
// {{.name}}BoolTransformFunc creates or modifies a point.
// {{$k.name}}BoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{.name}}BoolTransformFunc func(p *{{.Name}}Point) *BooleanPoint
type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
// {{.name}}DedupeIterator only outputs unique points.
// {{$k.name}}DedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type {{.name}}DedupeIterator struct {
input {{.Name}}Iterator
type {{$k.name}}DedupeIterator struct {
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
}
// new{{.Name}}DedupeIterator returns a new instance of {{.name}}DedupeIterator.
func new{{.Name}}DedupeIterator(input {{.Name}}Iterator) *{{.name}}DedupeIterator {
return &{{.name}}DedupeIterator{
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *{{.name}}DedupeIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}DedupeIterator) Next() *{{$k.Name}}Point {
for {
// Read next point.
p := itr.input.Next()
@ -889,7 +890,7 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encode{{.Name}}Point(p))
buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
@ -906,24 +907,24 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
}
}
// {{.name}}ReaderIterator represents an iterator that streams from a reader.
type {{.name}}ReaderIterator struct {
// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
type {{$k.name}}ReaderIterator struct {
r io.Reader
dec *{{.Name}}PointDecoder
first *{{.Name}}Point
dec *{{$k.Name}}PointDecoder
first *{{$k.Name}}Point
}
// new{{.Name}}ReaderIterator returns a new instance of {{.name}}ReaderIterator.
func new{{.Name}}ReaderIterator(r io.Reader, first *{{.Name}}Point) *{{.name}}ReaderIterator {
return &{{.name}}ReaderIterator{
// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
func new{{$k.Name}}ReaderIterator(r io.Reader, first *{{$k.Name}}Point) *{{$k.name}}ReaderIterator {
return &{{$k.name}}ReaderIterator{
r: r,
dec: New{{.Name}}PointDecoder(r),
dec: New{{$k.Name}}PointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *{{.name}}ReaderIterator) Close() error {
func (itr *{{$k.name}}ReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
@ -931,7 +932,7 @@ func (itr *{{.name}}ReaderIterator) Close() error {
}
// Next returns the next point from the iterator.
func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}ReaderIterator) Next() *{{$k.Name}}Point {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
@ -942,8 +943,8 @@ func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point {
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &{{.Name}}Point{}
if err := itr.dec.Decode{{.Name}}Point(p); err == io.EOF {
p := &{{$k.Name}}Point{}
if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
@ -998,4 +999,4 @@ func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error
}
}
{{end}}
{{end}}{{end}}