Merge pull request #8194 from influxdata/js-integral-function

Add "integral" function to InfluxQL
pull/8235/head
Jonathan A. Sternberg 2017-03-30 18:24:46 -05:00 committed by GitHub
commit a221e32291
11 changed files with 878 additions and 19 deletions

View File

@ -12,6 +12,7 @@
- [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS
- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL.
- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries.
- [#8194](https://github.com/influxdata/influxdb/pull/8194): Add "integral" function to InfluxQL.
### Bugfixes

View File

@ -2016,6 +2016,20 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
if err := s.validSampleAggr(expr); err != nil {
return err
}
case "integral":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
if min, max, got := 1, 2, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
// If a duration arg is passed, make sure it's a duration
if len(expr.Args) == 2 {
// Second must be a duration .e.g (1h)
if _, ok := expr.Args[1].(*DurationLiteral); !ok {
return errors.New("second argument must be a duration")
}
}
case "holt_winters", "holt_winters_with_fit":
if exp, got := 3, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
@ -4501,7 +4515,7 @@ func EvalType(expr Expr, sources Sources, typmap TypeMapper) DataType {
return typ
case *Call:
switch expr.Name {
case "mean", "median":
case "mean", "median", "integral":
return Float
case "count":
return Integer

View File

@ -1289,3 +1289,23 @@ func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator,
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
}
}
// newIntegralIterator returns an iterator for operating on a integral() call.
func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatIntegralReducer(interval, opt)
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerIntegralReducer(interval, opt)
return fn, fn
}
return newIntegerStreamFloatIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
}
}

View File

@ -757,3 +757,216 @@ func (r *FloatHoltWintersReducer) constrain(x []float64) {
x[3] = 0
}
}
// FloatIntegralReducer calculates the time-integral of the aggregated points.
type FloatIntegralReducer struct {
interval Interval
sum float64
prev FloatPoint
window struct {
start int64
end int64
}
ch chan FloatPoint
opt IteratorOptions
}
// NewFloatIntegralReducer creates a new FloatIntegralReducer.
func NewFloatIntegralReducer(interval Interval, opt IteratorOptions) *FloatIntegralReducer {
return &FloatIntegralReducer{
interval: interval,
prev: FloatPoint{Nil: true},
ch: make(chan FloatPoint, 1),
opt: opt,
}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatIntegralReducer) AggregateFloat(p *FloatPoint) {
// If this is the first point, just save it
if r.prev.Nil {
r.prev = *p
if !r.opt.Interval.IsZero() {
// Record the end of the time interval.
// We do not care for whether the last number is inclusive or exclusive
// because we treat both the same for the involved math.
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
}
return
}
// If this point has the same timestamp as the previous one,
// skip the point. Points sent into this reducer are expected
// to be fed in order.
if r.prev.Time == p.Time {
r.prev = *p
return
} else if !r.opt.Interval.IsZero() && ((r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end)) {
// If our previous time is not equal to the window, we need to
// interpolate the area at the end of this interval.
if r.prev.Time != r.window.end {
value := linearFloat(r.window.end, r.prev.Time, p.Time, r.prev.Value, p.Value)
elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + r.prev.Value) * elapsed
r.prev.Value = value
r.prev.Time = r.window.end
}
// Emit the current point through the channel and then clear it.
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
r.sum = 0.0
}
// Normal operation: update the sum using the trapezium rule
elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (p.Value + r.prev.Value) * elapsed
r.prev = *p
}
// Emit emits the time-integral of the aggregated points as a single point.
// InfluxQL convention dictates that outside a group-by-time clause we return
// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime
// and a higher level will change it to the start of the time group.
func (r *FloatIntegralReducer) Emit() []FloatPoint {
select {
case pt, ok := <-r.ch:
if !ok {
return nil
}
return []FloatPoint{pt}
default:
return nil
}
}
// Close flushes any in progress points to ensure any remaining points are
// emitted.
func (r *FloatIntegralReducer) Close() error {
// If our last point is at the start time, then discard this point since
// there is no area within this bucket. Otherwise, send off what we
// currently have as the final point.
if !r.prev.Nil && r.prev.Time != r.window.start {
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
}
close(r.ch)
return nil
}
// IntegerIntegralReducer calculates the time-integral of the aggregated points.
type IntegerIntegralReducer struct {
interval Interval
sum float64
prev IntegerPoint
window struct {
start int64
end int64
}
ch chan FloatPoint
opt IteratorOptions
}
// NewIntegerIntegralReducer creates a new IntegerIntegralReducer.
func NewIntegerIntegralReducer(interval Interval, opt IteratorOptions) *IntegerIntegralReducer {
return &IntegerIntegralReducer{
interval: interval,
prev: IntegerPoint{Nil: true},
ch: make(chan FloatPoint, 1),
opt: opt,
}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerIntegralReducer) AggregateInteger(p *IntegerPoint) {
// If this is the first point, just save it
if r.prev.Nil {
r.prev = *p
// Record the end of the time interval.
// We do not care for whether the last number is inclusive or exclusive
// because we treat both the same for the involved math.
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if r.window.start == MinTime {
r.window.start = 0
}
return
}
// If this point has the same timestamp as the previous one,
// skip the point. Points sent into this reducer are expected
// to be fed in order.
value := float64(p.Value)
if r.prev.Time == p.Time {
r.prev = *p
return
} else if (r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end) {
// If our previous time is not equal to the window, we need to
// interpolate the area at the end of this interval.
if r.prev.Time != r.window.end {
value = linearFloat(r.window.end, r.prev.Time, p.Time, float64(r.prev.Value), value)
elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed
r.prev.Time = r.window.end
}
// Emit the current point through the channel and then clear it.
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
r.sum = 0.0
}
// Normal operation: update the sum using the trapezium rule
elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed
r.prev = *p
}
// Emit emits the time-integral of the aggregated points as a single FLOAT point
// InfluxQL convention dictates that outside a group-by-time clause we return
// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime
// and a higher level will change it to the start of the time group.
func (r *IntegerIntegralReducer) Emit() []FloatPoint {
select {
case pt, ok := <-r.ch:
if !ok {
return nil
}
return []FloatPoint{pt}
default:
return nil
}
}
// Close flushes any in progress points to ensure any remaining points are
// emitted.
func (r *IntegerIntegralReducer) Close() error {
// If our last point is at the start time, then discard this point since
// there is no area within this bucket. Otherwise, send off what we
// currently have as the final point.
if !r.prev.Nil && r.prev.Time != r.window.start {
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
}
close(r.ch)
return nil
}

View File

@ -514,7 +514,7 @@ func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorIntern
var fileDescriptorInternal = []byte{
// 737 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a,
0x10, 0x46, 0x92, 0xe5, 0x58, 0xeb, 0xf8, 0x24, 0x67, 0x4f, 0x7e, 0x96, 0xc3, 0xe1, 0x54, 0xe8,
0x4a, 0x50, 0xea, 0x40, 0x6e, 0x0b, 0x05, 0xa7, 0x49, 0x8a, 0x21, 0x71, 0xc2, 0x2a, 0xe4, 0x7e,
0x6b, 0x8d, 0xc5, 0x82, 0x2c, 0xb9, 0xab, 0x55, 0x71, 0x1e, 0xa5, 0xcf, 0xd0, 0x87, 0xe9, 0xab,

View File

@ -1197,7 +1197,32 @@ func (itr *floatStreamFloatIterator) reduce() ([]FloatPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []FloatPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -1565,7 +1590,32 @@ func (itr *floatStreamIntegerIterator) reduce() ([]IntegerPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []IntegerPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -1937,7 +1987,32 @@ func (itr *floatStreamStringIterator) reduce() ([]StringPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []StringPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -2309,7 +2384,32 @@ func (itr *floatStreamBooleanIterator) reduce() ([]BooleanPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []BooleanPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -3870,7 +3970,32 @@ func (itr *integerStreamFloatIterator) reduce() ([]FloatPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []FloatPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -4242,7 +4367,32 @@ func (itr *integerStreamIntegerIterator) reduce() ([]IntegerPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []IntegerPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -4610,7 +4760,32 @@ func (itr *integerStreamStringIterator) reduce() ([]StringPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []StringPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -4982,7 +5157,32 @@ func (itr *integerStreamBooleanIterator) reduce() ([]BooleanPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []BooleanPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -6529,7 +6729,32 @@ func (itr *stringStreamFloatIterator) reduce() ([]FloatPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []FloatPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -6901,7 +7126,32 @@ func (itr *stringStreamIntegerIterator) reduce() ([]IntegerPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []IntegerPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -7273,7 +7523,32 @@ func (itr *stringStreamStringIterator) reduce() ([]StringPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []StringPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -7641,7 +7916,32 @@ func (itr *stringStreamBooleanIterator) reduce() ([]BooleanPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []BooleanPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -9188,7 +9488,32 @@ func (itr *booleanStreamFloatIterator) reduce() ([]FloatPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []FloatPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -9560,7 +9885,32 @@ func (itr *booleanStreamIntegerIterator) reduce() ([]IntegerPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []IntegerPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -9932,7 +10282,32 @@ func (itr *booleanStreamStringIterator) reduce() ([]StringPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []StringPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue
@ -10304,7 +10679,32 @@ func (itr *booleanStreamBooleanIterator) reduce() ([]BooleanPoint, error) {
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []BooleanPoint
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue

View File

@ -1198,7 +1198,32 @@ func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e
for {
// Read next point.
curr, err := itr.input.Next()
if curr == nil || err != nil {
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []{{$v.Name}}Point
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
return nil, err
} else if curr.Nil {
continue

View File

@ -919,6 +919,16 @@ func (opt IteratorOptions) ElapsedInterval() Interval {
return Interval{Duration: time.Nanosecond}
}
// IntegralInterval returns the time interval for the integral function.
func (opt IteratorOptions) IntegralInterval() Interval {
// Use the interval on the integral() call, if specified.
if expr, ok := opt.Expr.(*Call); ok && len(expr.Args) == 2 {
return Interval{Duration: expr.Args[1].(*DurationLiteral).Val}
}
return Interval{Duration: time.Second}
}
// GetDimensions retrieves the dimensions for this query.
func (opt IteratorOptions) GetDimensions() []string {
if len(opt.GroupBy) > 0 {

View File

@ -1181,6 +1181,15 @@ func TestIteratorOptions_ElapsedInterval_Call(t *testing.T) {
}
}
func TestIteratorOptions_IntegralInterval_Default(t *testing.T) {
opt := influxql.IteratorOptions{}
expected := influxql.Interval{Duration: time.Second}
actual := opt.IntegralInterval()
if actual != expected {
t.Errorf("expected default integral interval to be %v, got %v", expected, actual)
}
}
// Ensure iterator options can be marshaled to and from a binary format.
func TestIteratorOptions_MarshalBinary(t *testing.T) {
opt := &influxql.IteratorOptions{

View File

@ -794,6 +794,15 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return nil, err
}
return newCumulativeSumIterator(input, opt)
case "integral":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
interval := opt.IntegralInterval()
return newIntegralIterator(input, opt, interval)
}
itr, err := func() (Iterator, error) {

View File

@ -2930,6 +2930,164 @@ func TestSelect_Elapsed_Boolean(t *testing.T) {
}
}
func TestSelect_Integral_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 10 * Second, Value: 20},
{Name: "cpu", Time: 15 * Second, Value: 10},
{Name: "cpu", Time: 20 * Second, Value: 0},
{Name: "cpu", Time: 30 * Second, Value: -10},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Integral_Float_GroupByTime(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 10 * Second, Value: 20},
{Name: "cpu", Time: 15 * Second, Value: 10},
{Name: "cpu", Time: 20 * Second, Value: 0},
{Name: "cpu", Time: 30 * Second, Value: -10},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu WHERE time > 0s AND time < 60s GROUP BY time(20s)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 100}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -50}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Integral_Float_InterpolateGroupByTime(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 10 * Second, Value: 20},
{Name: "cpu", Time: 15 * Second, Value: 10},
{Name: "cpu", Time: 25 * Second, Value: 0},
{Name: "cpu", Time: 30 * Second, Value: -10},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu WHERE time > 0s AND time < 60s GROUP BY time(20s)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 112.5}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: -12.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Integral_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 5 * Second, Value: 10},
{Name: "cpu", Time: 10 * Second, Value: 0},
{Name: "cpu", Time: 20 * Second, Value: -10},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 50}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Integral_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 5 * Second, Value: 10},
{Name: "cpu", Time: 5 * Second, Value: 30},
{Name: "cpu", Time: 10 * Second, Value: 40},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value) FROM cpu`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 250}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Integral_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 5 * Second, Value: 10},
{Name: "cpu", Time: 5 * Second, Value: 30},
{Name: "cpu", Time: 10 * Second, Value: 40},
}}, nil
}
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT integral(value, 2s) FROM cpu`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0, Value: 125}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_MovingAverage_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {