From cac94a1fc7067cdaa8dcaf34cc5aeb12be41872e Mon Sep 17 00:00:00 2001 From: Tom Young Date: Sun, 6 Nov 2016 12:54:26 +0000 Subject: [PATCH 1/2] Add "integral" function to InfluxQL --- influxql/ast.go | 16 ++++- influxql/call_iterator.go | 20 ++++++ influxql/functions.go | 114 ++++++++++++++++++++++++++++++++ influxql/iterator.go | 10 +++ influxql/iterator_test.go | 9 +++ influxql/select.go | 9 +++ influxql/select_test.go | 132 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 309 insertions(+), 1 deletion(-) diff --git a/influxql/ast.go b/influxql/ast.go index 09e7f00104..5c38a7a9a3 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -2016,6 +2016,20 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { if err := s.validSampleAggr(expr); err != nil { return err } + case "integral": + if err := s.validSelectWithAggregate(); err != nil { + return err + } + if min, max, got := 1, 2, len(expr.Args); got > max || got < min { + return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got) + } + // If a duration arg is passed, make sure it's a duration + if len(expr.Args) == 2 { + // Second must be a duration .e.g (1h) + if _, ok := expr.Args[1].(*DurationLiteral); !ok { + return errors.New("second argument must be a duration") + } + } case "holt_winters", "holt_winters_with_fit": if exp, got := 3, len(expr.Args); got != exp { return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) @@ -4501,7 +4515,7 @@ func EvalType(expr Expr, sources Sources, typmap TypeMapper) DataType { return typ case *Call: switch expr.Name { - case "mean", "median": + case "mean", "median", "integral": return Float case "count": return Integer diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 908cee06a9..f66239542c 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -1289,3 +1289,23 @@ func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) } } + +// newIntegralIterator returns an iterator for operating on a integral() call. +func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) { + switch input := input.(type) { + case FloatIterator: + createFn := func() (FloatPointAggregator, FloatPointEmitter) { + fn := NewFloatIntegralReducer(interval) + return fn, fn + } + return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + case IntegerIterator: + createFn := func() (IntegerPointAggregator, FloatPointEmitter) { + fn := NewIntegerIntegralReducer(interval) + return fn, fn + } + return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + default: + return nil, fmt.Errorf("unsupported integral iterator type: %T", input) + } +} diff --git a/influxql/functions.go b/influxql/functions.go index a05c2d3c8b..87e47ddc40 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -757,3 +757,117 @@ func (r *FloatHoltWintersReducer) constrain(x []float64) { x[3] = 0 } } + +// FloatIntegralReducer calculates the time-integral of the aggregated points. +type FloatIntegralReducer struct { + count uint32 + interval Interval + sum float64 + prev FloatPoint +} + +// NewFloatIntegralReducer creates a new FloatIntegralReducer. +func NewFloatIntegralReducer(interval Interval) *FloatIntegralReducer { + return &FloatIntegralReducer{ + interval: interval, + prev: FloatPoint{Nil: true}, + } +} + +// AggregateFloat aggregates a point into the reducer. +func (r *FloatIntegralReducer) AggregateFloat(p *FloatPoint) { + + // If this is the first point, just save it + if r.prev.Nil { + r.prev = *p + r.count++ + return + } + + // If this point has the same timestamp as the previous one, + // use it but don't add anything to the integral. Effectively + // this means that we allow the curve we are integrating + // to have step changes in it, but who knows whether the points + // actually arrive in any particular order...? + if r.prev.Time == p.Time { + r.prev = *p + r.count++ + return + } + + // Normal operation: update the sum using the trapezium rule + elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * (p.Value + r.prev.Value) * elapsed + r.prev = *p + r.count++ + return +} + +// Emit emits the time-integral of the aggregated points as a single point. +// InfluxQL convention dictates that outside a group-by-time clause we return +// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime +// and a higher level will change it to the start of the time group. +func (r *FloatIntegralReducer) Emit() []FloatPoint { + return []FloatPoint{{ + Time: ZeroTime, + Value: r.sum, + Aggregated: r.count, + }} +} + +// IntegerIntegralReducer calculates the time-integral of the aggregated points. +type IntegerIntegralReducer struct { + count uint32 + interval Interval + sum float64 + prev IntegerPoint +} + +// NewIntegerIntegralReducer creates a new IntegerIntegralReducer. +func NewIntegerIntegralReducer(interval Interval) *IntegerIntegralReducer { + return &IntegerIntegralReducer{ + interval: interval, + prev: IntegerPoint{Nil: true}, + } +} + +// AggregateInteger aggregates a point into the reducer. +func (r *IntegerIntegralReducer) AggregateInteger(p *IntegerPoint) { + + // If this is the first point, just save it + if r.prev.Nil { + r.prev = *p + r.count++ + return + } + + // If this point has the same timestamp as the previous one, + // use it but don't add anything to the integral. Effectively + // this means that we allow the curve we are integrating + // to have step changes in it, but who knows whether the points + // actually arrive in any particular order...? + if r.prev.Time == p.Time { + r.prev = *p + r.count++ + return + } + + // Normal operation: update the sum using the trapezium rule + elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * float64(p.Value+r.prev.Value) * elapsed + r.prev = *p + r.count++ + return +} + +// Emit emits the time-integral of the aggregated points as a single FLOAT point +// InfluxQL convention dictates that outside a group-by-time clause we return +// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime +// and a higher level will change it to the start of the time group. +func (r *IntegerIntegralReducer) Emit() []FloatPoint { + return []FloatPoint{{ + Time: ZeroTime, + Value: r.sum, + Aggregated: r.count, + }} +} diff --git a/influxql/iterator.go b/influxql/iterator.go index cdaef7d966..292b77509d 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -919,6 +919,16 @@ func (opt IteratorOptions) ElapsedInterval() Interval { return Interval{Duration: time.Nanosecond} } +// IntegralInterval returns the time interval for the integral function. +func (opt IteratorOptions) IntegralInterval() Interval { + // Use the interval on the integral() call, if specified. + if expr, ok := opt.Expr.(*Call); ok && len(expr.Args) == 2 { + return Interval{Duration: expr.Args[1].(*DurationLiteral).Val} + } + + return Interval{Duration: time.Second} +} + // GetDimensions retrieves the dimensions for this query. func (opt IteratorOptions) GetDimensions() []string { if len(opt.GroupBy) > 0 { diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index dbebac6f85..480dce08f5 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -1181,6 +1181,15 @@ func TestIteratorOptions_ElapsedInterval_Call(t *testing.T) { } } +func TestIteratorOptions_IntegralInterval_Default(t *testing.T) { + opt := influxql.IteratorOptions{} + expected := influxql.Interval{Duration: time.Second} + actual := opt.IntegralInterval() + if actual != expected { + t.Errorf("expected default integral interval to be %v, got %v", expected, actual) + } +} + // Ensure iterator options can be marshaled to and from a binary format. func TestIteratorOptions_MarshalBinary(t *testing.T) { opt := &influxql.IteratorOptions{ diff --git a/influxql/select.go b/influxql/select.go index 76f4af073c..84104a48a8 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -951,6 +951,15 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { percentile = float64(arg.Val) } return newPercentileIterator(input, opt, percentile) + case "integral": + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) + if err != nil { + return nil, err + } + interval := opt.IntegralInterval() + return newIntegralIterator(input, opt, interval) default: return nil, fmt.Errorf("unsupported call: %s", expr.Name) } diff --git a/influxql/select_test.go b/influxql/select_test.go index 62b7aafd2d..098edae61f 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2930,6 +2930,138 @@ func TestSelect_Elapsed_Boolean(t *testing.T) { } } +func TestSelect_Integral_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 10 * Second, Value: 20}, + {Name: "cpu", Time: 15 * Second, Value: 10}, + {Name: "cpu", Time: 20 * Second, Value: 0}, + {Name: "cpu", Time: 30 * Second, Value: -10}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50, Aggregated: 4}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Integral_Float_GroupByTime(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 10 * Second, Value: 20}, + {Name: "cpu", Time: 15 * Second, Value: 10}, + {Name: "cpu", Time: 20 * Second, Value: 0}, + {Name: "cpu", Time: 30 * Second, Value: -10}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu WHERE time > 0s AND time < 60s GROUP BY time(20s)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 75, Aggregated: 2}}, + {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -50, Aggregated: 2}}, + {&influxql.FloatPoint{Name: "cpu", Time: 40 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Integral_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 5 * Second, Value: 10}, + {Name: "cpu", Time: 10 * Second, Value: 0}, + {Name: "cpu", Time: 20 * Second, Value: -10}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50, Aggregated: 4}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Integral_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 5 * Second, Value: 10}, + {Name: "cpu", Time: 5 * Second, Value: 30}, + {Name: "cpu", Time: 10 * Second, Value: 40}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 250, Aggregated: 4}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Integral_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 5 * Second, Value: 10}, + {Name: "cpu", Time: 5 * Second, Value: 30}, + {Name: "cpu", Time: 10 * Second, Value: 40}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value, 2s) FROM cpu`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 125, Aggregated: 4}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_MovingAverage_Float(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { From 2ea805c9283db322ac5d269a814b5470faf16239 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 23 Mar 2017 15:18:03 -0500 Subject: [PATCH 2/2] Interpolate between different intervals to find the whole area under the curve --- CHANGELOG.md | 1 + influxql/call_iterator.go | 8 +- influxql/functions.go | 165 +++++++++--- influxql/internal/internal.pb.go | 2 +- influxql/iterator.gen.go | 432 +++++++++++++++++++++++++++++-- influxql/iterator.gen.go.tmpl | 27 +- influxql/select.go | 18 +- influxql/select_test.go | 40 ++- 8 files changed, 622 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98cb6edd79..93f3ea52a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS - [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL. - [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries. +- [#8194](https://github.com/influxdata/influxdb/pull/8194): Add "integral" function to InfluxQL. ### Bugfixes diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index f66239542c..75b59e2dfb 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -1295,16 +1295,16 @@ func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) switch input := input.(type) { case FloatIterator: createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatIntegralReducer(interval) + fn := NewFloatIntegralReducer(interval, opt) return fn, fn } - return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + return newFloatStreamFloatIterator(input, createFn, opt), nil case IntegerIterator: createFn := func() (IntegerPointAggregator, FloatPointEmitter) { - fn := NewIntegerIntegralReducer(interval) + fn := NewIntegerIntegralReducer(interval, opt) return fn, fn } - return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + return newIntegerStreamFloatIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported integral iterator type: %T", input) } diff --git a/influxql/functions.go b/influxql/functions.go index 87e47ddc40..51f6542da1 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -760,47 +760,77 @@ func (r *FloatHoltWintersReducer) constrain(x []float64) { // FloatIntegralReducer calculates the time-integral of the aggregated points. type FloatIntegralReducer struct { - count uint32 interval Interval sum float64 prev FloatPoint + window struct { + start int64 + end int64 + } + ch chan FloatPoint + opt IteratorOptions } // NewFloatIntegralReducer creates a new FloatIntegralReducer. -func NewFloatIntegralReducer(interval Interval) *FloatIntegralReducer { +func NewFloatIntegralReducer(interval Interval, opt IteratorOptions) *FloatIntegralReducer { return &FloatIntegralReducer{ interval: interval, prev: FloatPoint{Nil: true}, + ch: make(chan FloatPoint, 1), + opt: opt, } } // AggregateFloat aggregates a point into the reducer. func (r *FloatIntegralReducer) AggregateFloat(p *FloatPoint) { - // If this is the first point, just save it if r.prev.Nil { r.prev = *p - r.count++ + if !r.opt.Interval.IsZero() { + // Record the end of the time interval. + // We do not care for whether the last number is inclusive or exclusive + // because we treat both the same for the involved math. + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + } return } // If this point has the same timestamp as the previous one, - // use it but don't add anything to the integral. Effectively - // this means that we allow the curve we are integrating - // to have step changes in it, but who knows whether the points - // actually arrive in any particular order...? + // skip the point. Points sent into this reducer are expected + // to be fed in order. if r.prev.Time == p.Time { r.prev = *p - r.count++ return + } else if !r.opt.Interval.IsZero() && ((r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end)) { + // If our previous time is not equal to the window, we need to + // interpolate the area at the end of this interval. + if r.prev.Time != r.window.end { + value := linearFloat(r.window.end, r.prev.Time, p.Time, r.prev.Value, p.Value) + elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * (value + r.prev.Value) * elapsed + + r.prev.Value = value + r.prev.Time = r.window.end + } + + // Emit the current point through the channel and then clear it. + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + r.sum = 0.0 } // Normal operation: update the sum using the trapezium rule elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration) r.sum += 0.5 * (p.Value + r.prev.Value) * elapsed r.prev = *p - r.count++ - return } // Emit emits the time-integral of the aggregated points as a single point. @@ -808,56 +838,108 @@ func (r *FloatIntegralReducer) AggregateFloat(p *FloatPoint) { // a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime // and a higher level will change it to the start of the time group. func (r *FloatIntegralReducer) Emit() []FloatPoint { - return []FloatPoint{{ - Time: ZeroTime, - Value: r.sum, - Aggregated: r.count, - }} + select { + case pt, ok := <-r.ch: + if !ok { + return nil + } + return []FloatPoint{pt} + default: + return nil + } +} + +// Close flushes any in progress points to ensure any remaining points are +// emitted. +func (r *FloatIntegralReducer) Close() error { + // If our last point is at the start time, then discard this point since + // there is no area within this bucket. Otherwise, send off what we + // currently have as the final point. + if !r.prev.Nil && r.prev.Time != r.window.start { + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + } + close(r.ch) + return nil } // IntegerIntegralReducer calculates the time-integral of the aggregated points. type IntegerIntegralReducer struct { - count uint32 interval Interval sum float64 prev IntegerPoint + window struct { + start int64 + end int64 + } + ch chan FloatPoint + opt IteratorOptions } // NewIntegerIntegralReducer creates a new IntegerIntegralReducer. -func NewIntegerIntegralReducer(interval Interval) *IntegerIntegralReducer { +func NewIntegerIntegralReducer(interval Interval, opt IteratorOptions) *IntegerIntegralReducer { return &IntegerIntegralReducer{ interval: interval, prev: IntegerPoint{Nil: true}, + ch: make(chan FloatPoint, 1), + opt: opt, } } // AggregateInteger aggregates a point into the reducer. func (r *IntegerIntegralReducer) AggregateInteger(p *IntegerPoint) { - // If this is the first point, just save it if r.prev.Nil { r.prev = *p - r.count++ + + // Record the end of the time interval. + // We do not care for whether the last number is inclusive or exclusive + // because we treat both the same for the involved math. + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if r.window.start == MinTime { + r.window.start = 0 + } return } // If this point has the same timestamp as the previous one, - // use it but don't add anything to the integral. Effectively - // this means that we allow the curve we are integrating - // to have step changes in it, but who knows whether the points - // actually arrive in any particular order...? + // skip the point. Points sent into this reducer are expected + // to be fed in order. + value := float64(p.Value) if r.prev.Time == p.Time { r.prev = *p - r.count++ return + } else if (r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end) { + // If our previous time is not equal to the window, we need to + // interpolate the area at the end of this interval. + if r.prev.Time != r.window.end { + value = linearFloat(r.window.end, r.prev.Time, p.Time, float64(r.prev.Value), value) + elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed + + r.prev.Time = r.window.end + } + + // Emit the current point through the channel and then clear it. + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + r.sum = 0.0 } // Normal operation: update the sum using the trapezium rule elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration) - r.sum += 0.5 * float64(p.Value+r.prev.Value) * elapsed + r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed r.prev = *p - r.count++ - return } // Emit emits the time-integral of the aggregated points as a single FLOAT point @@ -865,9 +947,26 @@ func (r *IntegerIntegralReducer) AggregateInteger(p *IntegerPoint) { // a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime // and a higher level will change it to the start of the time group. func (r *IntegerIntegralReducer) Emit() []FloatPoint { - return []FloatPoint{{ - Time: ZeroTime, - Value: r.sum, - Aggregated: r.count, - }} + select { + case pt, ok := <-r.ch: + if !ok { + return nil + } + return []FloatPoint{pt} + default: + return nil + } +} + +// Close flushes any in progress points to ensure any remaining points are +// emitted. +func (r *IntegerIntegralReducer) Close() error { + // If our last point is at the start time, then discard this point since + // there is no area within this bucket. Otherwise, send off what we + // currently have as the final point. + if !r.prev.Nil && r.prev.Time != r.window.start { + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + } + close(r.ch) + return nil } diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index ce8d9c61f2..8134e1b917 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -514,7 +514,7 @@ func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorIntern var fileDescriptorInternal = []byte{ // 737 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a, 0x10, 0x46, 0x92, 0xe5, 0x58, 0xeb, 0xf8, 0x24, 0x67, 0x4f, 0x7e, 0x96, 0xc3, 0xe1, 0x54, 0xe8, 0x4a, 0x50, 0xea, 0x40, 0x6e, 0x0b, 0x05, 0xa7, 0x49, 0x8a, 0x21, 0x71, 0xc2, 0x2a, 0xe4, 0x7e, 0x6b, 0x8d, 0xc5, 0x82, 0x2c, 0xb9, 0xab, 0x55, 0x71, 0x1e, 0xa5, 0xcf, 0xd0, 0x87, 0xe9, 0xab, diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 19355ca03b..26523f7f16 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -1197,7 +1197,32 @@ func (itr *floatStreamFloatIterator) reduce() ([]FloatPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []FloatPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -1565,7 +1590,32 @@ func (itr *floatStreamIntegerIterator) reduce() ([]IntegerPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []IntegerPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -1937,7 +1987,32 @@ func (itr *floatStreamStringIterator) reduce() ([]StringPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []StringPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -2309,7 +2384,32 @@ func (itr *floatStreamBooleanIterator) reduce() ([]BooleanPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []BooleanPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -3870,7 +3970,32 @@ func (itr *integerStreamFloatIterator) reduce() ([]FloatPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []FloatPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -4242,7 +4367,32 @@ func (itr *integerStreamIntegerIterator) reduce() ([]IntegerPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []IntegerPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -4610,7 +4760,32 @@ func (itr *integerStreamStringIterator) reduce() ([]StringPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []StringPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -4982,7 +5157,32 @@ func (itr *integerStreamBooleanIterator) reduce() ([]BooleanPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []BooleanPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -6529,7 +6729,32 @@ func (itr *stringStreamFloatIterator) reduce() ([]FloatPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []FloatPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -6901,7 +7126,32 @@ func (itr *stringStreamIntegerIterator) reduce() ([]IntegerPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []IntegerPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -7273,7 +7523,32 @@ func (itr *stringStreamStringIterator) reduce() ([]StringPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []StringPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -7641,7 +7916,32 @@ func (itr *stringStreamBooleanIterator) reduce() ([]BooleanPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []BooleanPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -9188,7 +9488,32 @@ func (itr *booleanStreamFloatIterator) reduce() ([]FloatPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []FloatPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -9560,7 +9885,32 @@ func (itr *booleanStreamIntegerIterator) reduce() ([]IntegerPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []IntegerPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -9932,7 +10282,32 @@ func (itr *booleanStreamStringIterator) reduce() ([]StringPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []StringPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue @@ -10304,7 +10679,32 @@ func (itr *booleanStreamBooleanIterator) reduce() ([]BooleanPoint, error) { for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []BooleanPoint + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index eb16d59788..048ae3e30e 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -1198,7 +1198,32 @@ func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e for { // Read next point. curr, err := itr.input.Next() - if curr == nil || err != nil { + if curr == nil { + // Close all of the aggregators to flush any remaining points to emit. + var points []{{$v.Name}}Point + for _, rp := range itr.m { + if aggregator, ok := rp.Aggregator.(io.Closer); ok { + if err := aggregator.Close(); err != nil { + return nil, err + } + + pts := rp.Emitter.Emit() + if len(pts) == 0 { + continue + } + + for i := range pts { + pts[i].Name = rp.Name + pts[i].Tags = rp.Tags + } + points = append(points, pts...) + } + } + + // Eliminate the aggregators and emitters. + itr.m = nil + return points, nil + } else if err != nil { return nil, err } else if curr.Nil { continue diff --git a/influxql/select.go b/influxql/select.go index 84104a48a8..9c2cc10cf1 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -794,6 +794,15 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return nil, err } return newCumulativeSumIterator(input, opt) + case "integral": + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) + if err != nil { + return nil, err + } + interval := opt.IntegralInterval() + return newIntegralIterator(input, opt, interval) } itr, err := func() (Iterator, error) { @@ -951,15 +960,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { percentile = float64(arg.Val) } return newPercentileIterator(input, opt, percentile) - case "integral": - opt := b.opt - opt.Ordered = true - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) - if err != nil { - return nil, err - } - interval := opt.IntegralInterval() - return newIntegralIterator(input, opt, interval) default: return nil, fmt.Errorf("unsupported call: %s", expr.Name) } diff --git a/influxql/select_test.go b/influxql/select_test.go index 098edae61f..9d4882dce5 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2950,7 +2950,7 @@ func TestSelect_Integral_Float(t *testing.T) { } else if a, err := Iterators(itrs).ReadAll(); err != nil { t.Fatalf("unexpected error: %s", err) } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50, Aggregated: 4}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50}}, }) { t.Fatalf("unexpected points: %s", spew.Sdump(a)) } @@ -2976,9 +2976,35 @@ func TestSelect_Integral_Float_GroupByTime(t *testing.T) { } else if a, err := Iterators(itrs).ReadAll(); err != nil { t.Fatalf("unexpected error: %s", err) } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 75, Aggregated: 2}}, - {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -50, Aggregated: 2}}, - {&influxql.FloatPoint{Name: "cpu", Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 100}}, + {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -50}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Integral_Float_InterpolateGroupByTime(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 10 * Second, Value: 20}, + {Name: "cpu", Time: 15 * Second, Value: 10}, + {Name: "cpu", Time: 25 * Second, Value: 0}, + {Name: "cpu", Time: 30 * Second, Value: -10}, + }}, nil + } + + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu WHERE time > 0s AND time < 60s GROUP BY time(20s)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 112.5}}, + {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -12.5}}, }) { t.Fatalf("unexpected points: %s", spew.Sdump(a)) } @@ -3004,7 +3030,7 @@ func TestSelect_Integral_Integer(t *testing.T) { } else if a, err := Iterators(itrs).ReadAll(); err != nil { t.Fatalf("unexpected error: %s", err) } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50, Aggregated: 4}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50}}, }) { t.Fatalf("unexpected points: %s", spew.Sdump(a)) } @@ -3030,7 +3056,7 @@ func TestSelect_Integral_Duplicate_Float(t *testing.T) { } else if a, err := Iterators(itrs).ReadAll(); err != nil { t.Fatalf("unexpected error: %s", err) } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 250, Aggregated: 4}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 250}}, }) { t.Fatalf("unexpected points: %s", spew.Sdump(a)) } @@ -3056,7 +3082,7 @@ func TestSelect_Integral_Duplicate_Integer(t *testing.T) { } else if a, err := Iterators(itrs).ReadAll(); err != nil { t.Fatalf("unexpected error: %s", err) } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 125, Aggregated: 4}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 125}}, }) { t.Fatalf("unexpected points: %s", spew.Sdump(a)) }