influxdb/query/functions.gen.go

2434 lines
85 KiB
Go
Raw Normal View History

// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: functions.gen.go.tmpl
package query
import (
"math/rand"
"sort"
"time"
)
// FloatPointAggregator aggregates points to produce a single point.
type FloatPointAggregator interface {
AggregateFloat(p *FloatPoint)
}
// FloatBulkPointAggregator aggregates multiple points at a time.
type FloatBulkPointAggregator interface {
AggregateFloatBulk(points []FloatPoint)
}
// AggregateFloatPoints feeds a slice of FloatPoint into an
// aggregator. If the aggregator is a FloatBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) {
switch a := a.(type) {
case FloatBulkPointAggregator:
a.AggregateFloatBulk(points)
default:
for _, p := range points {
a.AggregateFloat(&p)
}
}
}
// FloatPointEmitter produces a single point from an aggregate.
type FloatPointEmitter interface {
Emit() []FloatPoint
}
// FloatReduceFunc is the function called by a FloatPoint reducer.
type FloatReduceFunc func(prev *FloatPoint, curr *FloatPoint) (t int64, v float64, aux []interface{})
// FloatFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncReducer struct {
prev *FloatPoint
fn FloatReduceFunc
}
// NewFloatFuncReducer creates a new FloatFuncFloatReducer.
func NewFloatFuncReducer(fn FloatReduceFunc, prev *FloatPoint) *FloatFuncReducer {
return &FloatFuncReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// FloatReduceSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceSliceFunc func(a []FloatPoint) []FloatPoint
// FloatSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncReducer struct {
points []FloatPoint
fn FloatReduceSliceFunc
}
// NewFloatSliceFuncReducer creates a new FloatSliceFuncReducer.
func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer {
return &FloatSliceFuncReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// FloatReduceIntegerFunc is the function called by a FloatPoint reducer.
type FloatReduceIntegerFunc func(prev *IntegerPoint, curr *FloatPoint) (t int64, v int64, aux []interface{})
// FloatFuncIntegerReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncIntegerReducer struct {
prev *IntegerPoint
fn FloatReduceIntegerFunc
}
// NewFloatFuncIntegerReducer creates a new FloatFuncIntegerReducer.
func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc, prev *IntegerPoint) *FloatFuncIntegerReducer {
return &FloatFuncIntegerReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// FloatReduceIntegerSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceIntegerSliceFunc func(a []FloatPoint) []IntegerPoint
// FloatSliceFuncIntegerReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncIntegerReducer struct {
points []FloatPoint
fn FloatReduceIntegerSliceFunc
}
// NewFloatSliceFuncIntegerReducer creates a new FloatSliceFuncIntegerReducer.
func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSliceFuncIntegerReducer {
return &FloatSliceFuncIntegerReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncIntegerReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// FloatReduceUnsignedFunc is the function called by a FloatPoint reducer.
type FloatReduceUnsignedFunc func(prev *UnsignedPoint, curr *FloatPoint) (t int64, v uint64, aux []interface{})
// FloatFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncUnsignedReducer struct {
prev *UnsignedPoint
fn FloatReduceUnsignedFunc
}
// NewFloatFuncUnsignedReducer creates a new FloatFuncUnsignedReducer.
func NewFloatFuncUnsignedReducer(fn FloatReduceUnsignedFunc, prev *UnsignedPoint) *FloatFuncUnsignedReducer {
return &FloatFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// FloatReduceUnsignedSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceUnsignedSliceFunc func(a []FloatPoint) []UnsignedPoint
// FloatSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncUnsignedReducer struct {
points []FloatPoint
fn FloatReduceUnsignedSliceFunc
}
// NewFloatSliceFuncUnsignedReducer creates a new FloatSliceFuncUnsignedReducer.
func NewFloatSliceFuncUnsignedReducer(fn FloatReduceUnsignedSliceFunc) *FloatSliceFuncUnsignedReducer {
return &FloatSliceFuncUnsignedReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncUnsignedReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// FloatReduceStringFunc is the function called by a FloatPoint reducer.
type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{})
// FloatFuncStringReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncStringReducer struct {
prev *StringPoint
fn FloatReduceStringFunc
}
// NewFloatFuncStringReducer creates a new FloatFuncStringReducer.
func NewFloatFuncStringReducer(fn FloatReduceStringFunc, prev *StringPoint) *FloatFuncStringReducer {
return &FloatFuncStringReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncStringReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// FloatReduceStringSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceStringSliceFunc func(a []FloatPoint) []StringPoint
// FloatSliceFuncStringReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncStringReducer struct {
points []FloatPoint
fn FloatReduceStringSliceFunc
}
// NewFloatSliceFuncStringReducer creates a new FloatSliceFuncStringReducer.
func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFuncStringReducer {
return &FloatSliceFuncStringReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncStringReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// FloatReduceBooleanFunc is the function called by a FloatPoint reducer.
type FloatReduceBooleanFunc func(prev *BooleanPoint, curr *FloatPoint) (t int64, v bool, aux []interface{})
// FloatFuncBooleanReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncBooleanReducer struct {
prev *BooleanPoint
fn FloatReduceBooleanFunc
}
// NewFloatFuncBooleanReducer creates a new FloatFuncBooleanReducer.
func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc, prev *BooleanPoint) *FloatFuncBooleanReducer {
return &FloatFuncBooleanReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// FloatReduceBooleanSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceBooleanSliceFunc func(a []FloatPoint) []BooleanPoint
// FloatSliceFuncBooleanReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncBooleanReducer struct {
points []FloatPoint
fn FloatReduceBooleanSliceFunc
}
// NewFloatSliceFuncBooleanReducer creates a new FloatSliceFuncBooleanReducer.
func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSliceFuncBooleanReducer {
return &FloatSliceFuncBooleanReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncBooleanReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// FloatDistinctReducer returns the distinct points in a series.
type FloatDistinctReducer struct {
m map[float64]FloatPoint
}
// NewFloatDistinctReducer creates a new FloatDistinctReducer.
func NewFloatDistinctReducer() *FloatDistinctReducer {
return &FloatDistinctReducer{m: make(map[float64]FloatPoint)}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatDistinctReducer) AggregateFloat(p *FloatPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *FloatDistinctReducer) Emit() []FloatPoint {
points := make([]FloatPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, FloatPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(floatPoints(points))
return points
}
2016-04-19 16:36:41 +00:00
// FloatElapsedReducer calculates the elapsed of the aggregated points.
type FloatElapsedReducer struct {
unitConversion int64
prev FloatPoint
curr FloatPoint
}
// NewFloatElapsedReducer creates a new FloatElapsedReducer.
func NewFloatElapsedReducer(interval Interval) *FloatElapsedReducer {
return &FloatElapsedReducer{
unitConversion: int64(interval.Duration),
prev: FloatPoint{Nil: true},
curr: FloatPoint{Nil: true},
}
}
// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatElapsedReducer) AggregateFloat(p *FloatPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *FloatElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// FloatSampleReducer implements a reservoir sampling to calculate a random subset of points
type FloatSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points floatPoints // the reservoir
}
// NewFloatSampleReducer creates a new FloatSampleReducer
func NewFloatSampleReducer(size int) *FloatSampleReducer {
return &FloatSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(floatPoints, size),
}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *FloatSampleReducer) Emit() []FloatPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
AggregateInteger(p *IntegerPoint)
}
// IntegerBulkPointAggregator aggregates multiple points at a time.
type IntegerBulkPointAggregator interface {
AggregateIntegerBulk(points []IntegerPoint)
}
// AggregateIntegerPoints feeds a slice of IntegerPoint into an
// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) {
switch a := a.(type) {
case IntegerBulkPointAggregator:
a.AggregateIntegerBulk(points)
default:
for _, p := range points {
a.AggregateInteger(&p)
}
}
}
// IntegerPointEmitter produces a single point from an aggregate.
type IntegerPointEmitter interface {
Emit() []IntegerPoint
}
// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFloatFunc func(prev *FloatPoint, curr *IntegerPoint) (t int64, v float64, aux []interface{})
// IntegerFuncFloatReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncFloatReducer struct {
prev *FloatPoint
fn IntegerReduceFloatFunc
}
// NewIntegerFuncFloatReducer creates a new IntegerFuncFloatReducer.
func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc, prev *FloatPoint) *IntegerFuncFloatReducer {
return &IntegerFuncFloatReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// IntegerReduceFloatSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFloatSliceFunc func(a []IntegerPoint) []FloatPoint
// IntegerSliceFuncFloatReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncFloatReducer struct {
points []IntegerPoint
fn IntegerReduceFloatSliceFunc
}
// NewIntegerSliceFuncFloatReducer creates a new IntegerSliceFuncFloatReducer.
func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSliceFuncFloatReducer {
return &IntegerSliceFuncFloatReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncFloatReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// IntegerReduceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFunc func(prev *IntegerPoint, curr *IntegerPoint) (t int64, v int64, aux []interface{})
// IntegerFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncReducer struct {
prev *IntegerPoint
fn IntegerReduceFunc
}
// NewIntegerFuncReducer creates a new IntegerFuncIntegerReducer.
func NewIntegerFuncReducer(fn IntegerReduceFunc, prev *IntegerPoint) *IntegerFuncReducer {
return &IntegerFuncReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// IntegerReduceSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceSliceFunc func(a []IntegerPoint) []IntegerPoint
// IntegerSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncReducer struct {
points []IntegerPoint
fn IntegerReduceSliceFunc
}
// NewIntegerSliceFuncReducer creates a new IntegerSliceFuncReducer.
func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncReducer {
return &IntegerSliceFuncReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// IntegerReduceUnsignedFunc is the function called by a IntegerPoint reducer.
type IntegerReduceUnsignedFunc func(prev *UnsignedPoint, curr *IntegerPoint) (t int64, v uint64, aux []interface{})
// IntegerFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncUnsignedReducer struct {
prev *UnsignedPoint
fn IntegerReduceUnsignedFunc
}
// NewIntegerFuncUnsignedReducer creates a new IntegerFuncUnsignedReducer.
func NewIntegerFuncUnsignedReducer(fn IntegerReduceUnsignedFunc, prev *UnsignedPoint) *IntegerFuncUnsignedReducer {
return &IntegerFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// IntegerReduceUnsignedSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceUnsignedSliceFunc func(a []IntegerPoint) []UnsignedPoint
// IntegerSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncUnsignedReducer struct {
points []IntegerPoint
fn IntegerReduceUnsignedSliceFunc
}
// NewIntegerSliceFuncUnsignedReducer creates a new IntegerSliceFuncUnsignedReducer.
func NewIntegerSliceFuncUnsignedReducer(fn IntegerReduceUnsignedSliceFunc) *IntegerSliceFuncUnsignedReducer {
return &IntegerSliceFuncUnsignedReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncUnsignedReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{})
// IntegerFuncStringReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncStringReducer struct {
prev *StringPoint
fn IntegerReduceStringFunc
}
// NewIntegerFuncStringReducer creates a new IntegerFuncStringReducer.
func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc, prev *StringPoint) *IntegerFuncStringReducer {
return &IntegerFuncStringReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncStringReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// IntegerReduceStringSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringSliceFunc func(a []IntegerPoint) []StringPoint
// IntegerSliceFuncStringReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncStringReducer struct {
points []IntegerPoint
fn IntegerReduceStringSliceFunc
}
// NewIntegerSliceFuncStringReducer creates a new IntegerSliceFuncStringReducer.
func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerSliceFuncStringReducer {
return &IntegerSliceFuncStringReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncStringReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer.
type IntegerReduceBooleanFunc func(prev *BooleanPoint, curr *IntegerPoint) (t int64, v bool, aux []interface{})
// IntegerFuncBooleanReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncBooleanReducer struct {
prev *BooleanPoint
fn IntegerReduceBooleanFunc
}
// NewIntegerFuncBooleanReducer creates a new IntegerFuncBooleanReducer.
func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc, prev *BooleanPoint) *IntegerFuncBooleanReducer {
return &IntegerFuncBooleanReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// IntegerReduceBooleanSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceBooleanSliceFunc func(a []IntegerPoint) []BooleanPoint
// IntegerSliceFuncBooleanReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncBooleanReducer struct {
points []IntegerPoint
fn IntegerReduceBooleanSliceFunc
}
// NewIntegerSliceFuncBooleanReducer creates a new IntegerSliceFuncBooleanReducer.
func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *IntegerSliceFuncBooleanReducer {
return &IntegerSliceFuncBooleanReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncBooleanReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// IntegerDistinctReducer returns the distinct points in a series.
type IntegerDistinctReducer struct {
m map[int64]IntegerPoint
}
// NewIntegerDistinctReducer creates a new IntegerDistinctReducer.
func NewIntegerDistinctReducer() *IntegerDistinctReducer {
return &IntegerDistinctReducer{m: make(map[int64]IntegerPoint)}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerDistinctReducer) AggregateInteger(p *IntegerPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *IntegerDistinctReducer) Emit() []IntegerPoint {
points := make([]IntegerPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, IntegerPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(integerPoints(points))
return points
}
2016-04-19 16:36:41 +00:00
// IntegerElapsedReducer calculates the elapsed of the aggregated points.
type IntegerElapsedReducer struct {
unitConversion int64
prev IntegerPoint
curr IntegerPoint
}
// NewIntegerElapsedReducer creates a new IntegerElapsedReducer.
func NewIntegerElapsedReducer(interval Interval) *IntegerElapsedReducer {
return &IntegerElapsedReducer{
unitConversion: int64(interval.Duration),
prev: IntegerPoint{Nil: true},
curr: IntegerPoint{Nil: true},
}
}
// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerElapsedReducer) AggregateInteger(p *IntegerPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *IntegerElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// IntegerSampleReducer implements a reservoir sampling to calculate a random subset of points
type IntegerSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points integerPoints // the reservoir
}
// NewIntegerSampleReducer creates a new IntegerSampleReducer
func NewIntegerSampleReducer(size int) *IntegerSampleReducer {
return &IntegerSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(integerPoints, size),
}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *IntegerSampleReducer) Emit() []IntegerPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
// UnsignedPointAggregator aggregates points to produce a single point.
type UnsignedPointAggregator interface {
AggregateUnsigned(p *UnsignedPoint)
}
// UnsignedBulkPointAggregator aggregates multiple points at a time.
type UnsignedBulkPointAggregator interface {
AggregateUnsignedBulk(points []UnsignedPoint)
}
// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an
// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) {
switch a := a.(type) {
case UnsignedBulkPointAggregator:
a.AggregateUnsignedBulk(points)
default:
for _, p := range points {
a.AggregateUnsigned(&p)
}
}
}
// UnsignedPointEmitter produces a single point from an aggregate.
type UnsignedPointEmitter interface {
Emit() []UnsignedPoint
}
// UnsignedReduceFloatFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFloatFunc func(prev *FloatPoint, curr *UnsignedPoint) (t int64, v float64, aux []interface{})
// UnsignedFuncFloatReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncFloatReducer struct {
prev *FloatPoint
fn UnsignedReduceFloatFunc
}
// NewUnsignedFuncFloatReducer creates a new UnsignedFuncFloatReducer.
func NewUnsignedFuncFloatReducer(fn UnsignedReduceFloatFunc, prev *FloatPoint) *UnsignedFuncFloatReducer {
return &UnsignedFuncFloatReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// UnsignedReduceFloatSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFloatSliceFunc func(a []UnsignedPoint) []FloatPoint
// UnsignedSliceFuncFloatReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncFloatReducer struct {
points []UnsignedPoint
fn UnsignedReduceFloatSliceFunc
}
// NewUnsignedSliceFuncFloatReducer creates a new UnsignedSliceFuncFloatReducer.
func NewUnsignedSliceFuncFloatReducer(fn UnsignedReduceFloatSliceFunc) *UnsignedSliceFuncFloatReducer {
return &UnsignedSliceFuncFloatReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncFloatReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// UnsignedReduceIntegerFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceIntegerFunc func(prev *IntegerPoint, curr *UnsignedPoint) (t int64, v int64, aux []interface{})
// UnsignedFuncIntegerReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncIntegerReducer struct {
prev *IntegerPoint
fn UnsignedReduceIntegerFunc
}
// NewUnsignedFuncIntegerReducer creates a new UnsignedFuncIntegerReducer.
func NewUnsignedFuncIntegerReducer(fn UnsignedReduceIntegerFunc, prev *IntegerPoint) *UnsignedFuncIntegerReducer {
return &UnsignedFuncIntegerReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// UnsignedReduceIntegerSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceIntegerSliceFunc func(a []UnsignedPoint) []IntegerPoint
// UnsignedSliceFuncIntegerReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncIntegerReducer struct {
points []UnsignedPoint
fn UnsignedReduceIntegerSliceFunc
}
// NewUnsignedSliceFuncIntegerReducer creates a new UnsignedSliceFuncIntegerReducer.
func NewUnsignedSliceFuncIntegerReducer(fn UnsignedReduceIntegerSliceFunc) *UnsignedSliceFuncIntegerReducer {
return &UnsignedSliceFuncIntegerReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// UnsignedReduceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFunc func(prev *UnsignedPoint, curr *UnsignedPoint) (t int64, v uint64, aux []interface{})
// UnsignedFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncReducer struct {
prev *UnsignedPoint
fn UnsignedReduceFunc
}
// NewUnsignedFuncReducer creates a new UnsignedFuncUnsignedReducer.
func NewUnsignedFuncReducer(fn UnsignedReduceFunc, prev *UnsignedPoint) *UnsignedFuncReducer {
return &UnsignedFuncReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// UnsignedReduceSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceSliceFunc func(a []UnsignedPoint) []UnsignedPoint
// UnsignedSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncReducer struct {
points []UnsignedPoint
fn UnsignedReduceSliceFunc
}
// NewUnsignedSliceFuncReducer creates a new UnsignedSliceFuncReducer.
func NewUnsignedSliceFuncReducer(fn UnsignedReduceSliceFunc) *UnsignedSliceFuncReducer {
return &UnsignedSliceFuncReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// UnsignedReduceStringFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceStringFunc func(prev *StringPoint, curr *UnsignedPoint) (t int64, v string, aux []interface{})
// UnsignedFuncStringReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncStringReducer struct {
prev *StringPoint
fn UnsignedReduceStringFunc
}
// NewUnsignedFuncStringReducer creates a new UnsignedFuncStringReducer.
func NewUnsignedFuncStringReducer(fn UnsignedReduceStringFunc, prev *StringPoint) *UnsignedFuncStringReducer {
return &UnsignedFuncStringReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// UnsignedReduceStringSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceStringSliceFunc func(a []UnsignedPoint) []StringPoint
// UnsignedSliceFuncStringReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncStringReducer struct {
points []UnsignedPoint
fn UnsignedReduceStringSliceFunc
}
// NewUnsignedSliceFuncStringReducer creates a new UnsignedSliceFuncStringReducer.
func NewUnsignedSliceFuncStringReducer(fn UnsignedReduceStringSliceFunc) *UnsignedSliceFuncStringReducer {
return &UnsignedSliceFuncStringReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncStringReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// UnsignedReduceBooleanFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceBooleanFunc func(prev *BooleanPoint, curr *UnsignedPoint) (t int64, v bool, aux []interface{})
// UnsignedFuncBooleanReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncBooleanReducer struct {
prev *BooleanPoint
fn UnsignedReduceBooleanFunc
}
// NewUnsignedFuncBooleanReducer creates a new UnsignedFuncBooleanReducer.
func NewUnsignedFuncBooleanReducer(fn UnsignedReduceBooleanFunc, prev *BooleanPoint) *UnsignedFuncBooleanReducer {
return &UnsignedFuncBooleanReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// UnsignedReduceBooleanSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceBooleanSliceFunc func(a []UnsignedPoint) []BooleanPoint
// UnsignedSliceFuncBooleanReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncBooleanReducer struct {
points []UnsignedPoint
fn UnsignedReduceBooleanSliceFunc
}
// NewUnsignedSliceFuncBooleanReducer creates a new UnsignedSliceFuncBooleanReducer.
func NewUnsignedSliceFuncBooleanReducer(fn UnsignedReduceBooleanSliceFunc) *UnsignedSliceFuncBooleanReducer {
return &UnsignedSliceFuncBooleanReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// UnsignedDistinctReducer returns the distinct points in a series.
type UnsignedDistinctReducer struct {
m map[uint64]UnsignedPoint
}
// NewUnsignedDistinctReducer creates a new UnsignedDistinctReducer.
func NewUnsignedDistinctReducer() *UnsignedDistinctReducer {
return &UnsignedDistinctReducer{m: make(map[uint64]UnsignedPoint)}
}
// AggregateUnsigned aggregates a point into the reducer.
func (r *UnsignedDistinctReducer) AggregateUnsigned(p *UnsignedPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *UnsignedDistinctReducer) Emit() []UnsignedPoint {
points := make([]UnsignedPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, UnsignedPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(unsignedPoints(points))
return points
}
// UnsignedElapsedReducer calculates the elapsed of the aggregated points.
type UnsignedElapsedReducer struct {
unitConversion int64
prev UnsignedPoint
curr UnsignedPoint
}
// NewUnsignedElapsedReducer creates a new UnsignedElapsedReducer.
func NewUnsignedElapsedReducer(interval Interval) *UnsignedElapsedReducer {
return &UnsignedElapsedReducer{
unitConversion: int64(interval.Duration),
prev: UnsignedPoint{Nil: true},
curr: UnsignedPoint{Nil: true},
}
}
// AggregateUnsigned aggregates a point into the reducer and updates the current window.
func (r *UnsignedElapsedReducer) AggregateUnsigned(p *UnsignedPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *UnsignedElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// UnsignedSampleReducer implements a reservoir sampling to calculate a random subset of points
type UnsignedSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points unsignedPoints // the reservoir
}
// NewUnsignedSampleReducer creates a new UnsignedSampleReducer
func NewUnsignedSampleReducer(size int) *UnsignedSampleReducer {
return &UnsignedSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(unsignedPoints, size),
}
}
// AggregateUnsigned aggregates a point into the reducer.
func (r *UnsignedSampleReducer) AggregateUnsigned(p *UnsignedPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *UnsignedSampleReducer) Emit() []UnsignedPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
AggregateString(p *StringPoint)
}
// StringBulkPointAggregator aggregates multiple points at a time.
type StringBulkPointAggregator interface {
AggregateStringBulk(points []StringPoint)
}
// AggregateStringPoints feeds a slice of StringPoint into an
// aggregator. If the aggregator is a StringBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateStringPoints(a StringPointAggregator, points []StringPoint) {
switch a := a.(type) {
case StringBulkPointAggregator:
a.AggregateStringBulk(points)
default:
for _, p := range points {
a.AggregateString(&p)
}
}
}
// StringPointEmitter produces a single point from an aggregate.
type StringPointEmitter interface {
Emit() []StringPoint
}
// StringReduceFloatFunc is the function called by a StringPoint reducer.
type StringReduceFloatFunc func(prev *FloatPoint, curr *StringPoint) (t int64, v float64, aux []interface{})
// StringFuncFloatReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncFloatReducer struct {
prev *FloatPoint
fn StringReduceFloatFunc
}
// NewStringFuncFloatReducer creates a new StringFuncFloatReducer.
func NewStringFuncFloatReducer(fn StringReduceFloatFunc, prev *FloatPoint) *StringFuncFloatReducer {
return &StringFuncFloatReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncFloatReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// StringReduceFloatSliceFunc is the function called by a StringPoint reducer.
type StringReduceFloatSliceFunc func(a []StringPoint) []FloatPoint
// StringSliceFuncFloatReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncFloatReducer struct {
points []StringPoint
fn StringReduceFloatSliceFunc
}
// NewStringSliceFuncFloatReducer creates a new StringSliceFuncFloatReducer.
func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceFuncFloatReducer {
return &StringSliceFuncFloatReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncFloatReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// StringReduceIntegerFunc is the function called by a StringPoint reducer.
type StringReduceIntegerFunc func(prev *IntegerPoint, curr *StringPoint) (t int64, v int64, aux []interface{})
// StringFuncIntegerReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncIntegerReducer struct {
prev *IntegerPoint
fn StringReduceIntegerFunc
}
// NewStringFuncIntegerReducer creates a new StringFuncIntegerReducer.
func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc, prev *IntegerPoint) *StringFuncIntegerReducer {
return &StringFuncIntegerReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncIntegerReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// StringReduceIntegerSliceFunc is the function called by a StringPoint reducer.
type StringReduceIntegerSliceFunc func(a []StringPoint) []IntegerPoint
// StringSliceFuncIntegerReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncIntegerReducer struct {
points []StringPoint
fn StringReduceIntegerSliceFunc
}
// NewStringSliceFuncIntegerReducer creates a new StringSliceFuncIntegerReducer.
func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSliceFuncIntegerReducer {
return &StringSliceFuncIntegerReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncIntegerReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// StringReduceUnsignedFunc is the function called by a StringPoint reducer.
type StringReduceUnsignedFunc func(prev *UnsignedPoint, curr *StringPoint) (t int64, v uint64, aux []interface{})
// StringFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncUnsignedReducer struct {
prev *UnsignedPoint
fn StringReduceUnsignedFunc
}
// NewStringFuncUnsignedReducer creates a new StringFuncUnsignedReducer.
func NewStringFuncUnsignedReducer(fn StringReduceUnsignedFunc, prev *UnsignedPoint) *StringFuncUnsignedReducer {
return &StringFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncUnsignedReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// StringReduceUnsignedSliceFunc is the function called by a StringPoint reducer.
type StringReduceUnsignedSliceFunc func(a []StringPoint) []UnsignedPoint
// StringSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncUnsignedReducer struct {
points []StringPoint
fn StringReduceUnsignedSliceFunc
}
// NewStringSliceFuncUnsignedReducer creates a new StringSliceFuncUnsignedReducer.
func NewStringSliceFuncUnsignedReducer(fn StringReduceUnsignedSliceFunc) *StringSliceFuncUnsignedReducer {
return &StringSliceFuncUnsignedReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncUnsignedReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncUnsignedReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// StringReduceFunc is the function called by a StringPoint reducer.
type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{})
// StringFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncReducer struct {
prev *StringPoint
fn StringReduceFunc
}
// NewStringFuncReducer creates a new StringFuncStringReducer.
func NewStringFuncReducer(fn StringReduceFunc, prev *StringPoint) *StringFuncReducer {
return &StringFuncReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// StringReduceSliceFunc is the function called by a StringPoint reducer.
type StringReduceSliceFunc func(a []StringPoint) []StringPoint
// StringSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncReducer struct {
points []StringPoint
fn StringReduceSliceFunc
}
// NewStringSliceFuncReducer creates a new StringSliceFuncReducer.
func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer {
return &StringSliceFuncReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// StringReduceBooleanFunc is the function called by a StringPoint reducer.
type StringReduceBooleanFunc func(prev *BooleanPoint, curr *StringPoint) (t int64, v bool, aux []interface{})
// StringFuncBooleanReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncBooleanReducer struct {
prev *BooleanPoint
fn StringReduceBooleanFunc
}
// NewStringFuncBooleanReducer creates a new StringFuncBooleanReducer.
func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc, prev *BooleanPoint) *StringFuncBooleanReducer {
return &StringFuncBooleanReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncBooleanReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// StringReduceBooleanSliceFunc is the function called by a StringPoint reducer.
type StringReduceBooleanSliceFunc func(a []StringPoint) []BooleanPoint
// StringSliceFuncBooleanReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncBooleanReducer struct {
points []StringPoint
fn StringReduceBooleanSliceFunc
}
// NewStringSliceFuncBooleanReducer creates a new StringSliceFuncBooleanReducer.
func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSliceFuncBooleanReducer {
return &StringSliceFuncBooleanReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncBooleanReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// StringDistinctReducer returns the distinct points in a series.
type StringDistinctReducer struct {
m map[string]StringPoint
}
// NewStringDistinctReducer creates a new StringDistinctReducer.
func NewStringDistinctReducer() *StringDistinctReducer {
return &StringDistinctReducer{m: make(map[string]StringPoint)}
}
// AggregateString aggregates a point into the reducer.
func (r *StringDistinctReducer) AggregateString(p *StringPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *StringDistinctReducer) Emit() []StringPoint {
points := make([]StringPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, StringPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(stringPoints(points))
return points
}
2016-04-19 16:36:41 +00:00
// StringElapsedReducer calculates the elapsed of the aggregated points.
type StringElapsedReducer struct {
unitConversion int64
prev StringPoint
curr StringPoint
}
// NewStringElapsedReducer creates a new StringElapsedReducer.
func NewStringElapsedReducer(interval Interval) *StringElapsedReducer {
return &StringElapsedReducer{
unitConversion: int64(interval.Duration),
prev: StringPoint{Nil: true},
curr: StringPoint{Nil: true},
}
}
// AggregateString aggregates a point into the reducer and updates the current window.
func (r *StringElapsedReducer) AggregateString(p *StringPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *StringElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// StringSampleReducer implements a reservoir sampling to calculate a random subset of points
type StringSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points stringPoints // the reservoir
}
// NewStringSampleReducer creates a new StringSampleReducer
func NewStringSampleReducer(size int) *StringSampleReducer {
return &StringSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(stringPoints, size),
}
}
// AggregateString aggregates a point into the reducer.
func (r *StringSampleReducer) AggregateString(p *StringPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *StringSampleReducer) Emit() []StringPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
AggregateBoolean(p *BooleanPoint)
}
// BooleanBulkPointAggregator aggregates multiple points at a time.
type BooleanBulkPointAggregator interface {
AggregateBooleanBulk(points []BooleanPoint)
}
// AggregateBooleanPoints feeds a slice of BooleanPoint into an
// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) {
switch a := a.(type) {
case BooleanBulkPointAggregator:
a.AggregateBooleanBulk(points)
default:
for _, p := range points {
a.AggregateBoolean(&p)
}
}
}
// BooleanPointEmitter produces a single point from an aggregate.
type BooleanPointEmitter interface {
Emit() []BooleanPoint
}
// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFloatFunc func(prev *FloatPoint, curr *BooleanPoint) (t int64, v float64, aux []interface{})
// BooleanFuncFloatReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncFloatReducer struct {
prev *FloatPoint
fn BooleanReduceFloatFunc
}
// NewBooleanFuncFloatReducer creates a new BooleanFuncFloatReducer.
func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc, prev *FloatPoint) *BooleanFuncFloatReducer {
return &BooleanFuncFloatReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// BooleanReduceFloatSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFloatSliceFunc func(a []BooleanPoint) []FloatPoint
// BooleanSliceFuncFloatReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncFloatReducer struct {
points []BooleanPoint
fn BooleanReduceFloatSliceFunc
}
// NewBooleanSliceFuncFloatReducer creates a new BooleanSliceFuncFloatReducer.
func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSliceFuncFloatReducer {
return &BooleanSliceFuncFloatReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncFloatReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer.
type BooleanReduceIntegerFunc func(prev *IntegerPoint, curr *BooleanPoint) (t int64, v int64, aux []interface{})
// BooleanFuncIntegerReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncIntegerReducer struct {
prev *IntegerPoint
fn BooleanReduceIntegerFunc
}
// NewBooleanFuncIntegerReducer creates a new BooleanFuncIntegerReducer.
func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc, prev *IntegerPoint) *BooleanFuncIntegerReducer {
return &BooleanFuncIntegerReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// BooleanReduceIntegerSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceIntegerSliceFunc func(a []BooleanPoint) []IntegerPoint
// BooleanSliceFuncIntegerReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncIntegerReducer struct {
points []BooleanPoint
fn BooleanReduceIntegerSliceFunc
}
// NewBooleanSliceFuncIntegerReducer creates a new BooleanSliceFuncIntegerReducer.
func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *BooleanSliceFuncIntegerReducer {
return &BooleanSliceFuncIntegerReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncIntegerReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// BooleanReduceUnsignedFunc is the function called by a BooleanPoint reducer.
type BooleanReduceUnsignedFunc func(prev *UnsignedPoint, curr *BooleanPoint) (t int64, v uint64, aux []interface{})
// BooleanFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncUnsignedReducer struct {
prev *UnsignedPoint
fn BooleanReduceUnsignedFunc
}
// NewBooleanFuncUnsignedReducer creates a new BooleanFuncUnsignedReducer.
func NewBooleanFuncUnsignedReducer(fn BooleanReduceUnsignedFunc, prev *UnsignedPoint) *BooleanFuncUnsignedReducer {
return &BooleanFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// BooleanReduceUnsignedSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceUnsignedSliceFunc func(a []BooleanPoint) []UnsignedPoint
// BooleanSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncUnsignedReducer struct {
points []BooleanPoint
fn BooleanReduceUnsignedSliceFunc
}
// NewBooleanSliceFuncUnsignedReducer creates a new BooleanSliceFuncUnsignedReducer.
func NewBooleanSliceFuncUnsignedReducer(fn BooleanReduceUnsignedSliceFunc) *BooleanSliceFuncUnsignedReducer {
return &BooleanSliceFuncUnsignedReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncUnsignedReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{})
// BooleanFuncStringReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncStringReducer struct {
prev *StringPoint
fn BooleanReduceStringFunc
}
// NewBooleanFuncStringReducer creates a new BooleanFuncStringReducer.
func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc, prev *StringPoint) *BooleanFuncStringReducer {
return &BooleanFuncStringReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// BooleanReduceStringSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringSliceFunc func(a []BooleanPoint) []StringPoint
// BooleanSliceFuncStringReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncStringReducer struct {
points []BooleanPoint
fn BooleanReduceStringSliceFunc
}
// NewBooleanSliceFuncStringReducer creates a new BooleanSliceFuncStringReducer.
func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanSliceFuncStringReducer {
return &BooleanSliceFuncStringReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncStringReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// BooleanReduceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFunc func(prev *BooleanPoint, curr *BooleanPoint) (t int64, v bool, aux []interface{})
// BooleanFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncReducer struct {
prev *BooleanPoint
fn BooleanReduceFunc
}
// NewBooleanFuncReducer creates a new BooleanFuncBooleanReducer.
func NewBooleanFuncReducer(fn BooleanReduceFunc, prev *BooleanPoint) *BooleanFuncReducer {
return &BooleanFuncReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// BooleanReduceSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceSliceFunc func(a []BooleanPoint) []BooleanPoint
// BooleanSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncReducer struct {
points []BooleanPoint
fn BooleanReduceSliceFunc
}
// NewBooleanSliceFuncReducer creates a new BooleanSliceFuncReducer.
func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncReducer {
return &BooleanSliceFuncReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) {
Update subqueries so groupings are propagated to inner queries Previously, only time expressions got propagated inwards. The reason for this was simple. If the outer query was going to filter to a specific time range, then it would be unnecessary for the inner query to output points within that time frame. It started as an optimization, but became a feature because there was no reason to have the user repeat the same time clause for the inner query as the outer query. So we allowed an aggregate query with an interval to pass validation in the subquery if the outer query had a time range. But `GROUP BY` clauses were not propagated because that same logic didn't apply to them. It's not an optimization there. So while grouping by a tag in the outer query without grouping by it in the inner query was useless, there wasn't any particular reason to care. Then a bug was found where wildcards would propagate the dimensions correctly, but the outer query containing a group by with the inner query omitting it wouldn't correctly filter out the outer group by. We could fix that filtering, but on further review, I had been seeing people make that same mistake a lot. People seem to just believe that the grouping should be propagated inwards. Instead of trying to fight what the user wanted and explicitly erase groupings that weren't propagated manually, we might as well just propagate them for the user to make their lives easier. There is no useful situation where you would want to group into buckets that can't physically exist so we might as well do _something_ useful. This will also now propagate time intervals to inner queries since the same applies there. But, while the interval propagates, the following query will not pass validation since it is still not possible to use a grouping interval with a raw query (even if the inner query is an aggregate): SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m) This also means wildcards will behave a bit differently. They will retrieve dimensions from the sources in the inner query rather than just using the dimensions in the group by. Fixing top() and bottom() to return the correct auxiliary fields. Unfortunately, we were not copying the buffer with the auxiliary fields so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// BooleanDistinctReducer returns the distinct points in a series.
type BooleanDistinctReducer struct {
m map[bool]BooleanPoint
}
// NewBooleanDistinctReducer creates a new BooleanDistinctReducer.
func NewBooleanDistinctReducer() *BooleanDistinctReducer {
return &BooleanDistinctReducer{m: make(map[bool]BooleanPoint)}
}
// AggregateBoolean aggregates a point into the reducer.
func (r *BooleanDistinctReducer) AggregateBoolean(p *BooleanPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *BooleanDistinctReducer) Emit() []BooleanPoint {
points := make([]BooleanPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, BooleanPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(booleanPoints(points))
return points
}
2016-04-19 16:36:41 +00:00
// BooleanElapsedReducer calculates the elapsed of the aggregated points.
type BooleanElapsedReducer struct {
unitConversion int64
prev BooleanPoint
curr BooleanPoint
}
// NewBooleanElapsedReducer creates a new BooleanElapsedReducer.
func NewBooleanElapsedReducer(interval Interval) *BooleanElapsedReducer {
return &BooleanElapsedReducer{
unitConversion: int64(interval.Duration),
prev: BooleanPoint{Nil: true},
curr: BooleanPoint{Nil: true},
}
}
// AggregateBoolean aggregates a point into the reducer and updates the current window.
func (r *BooleanElapsedReducer) AggregateBoolean(p *BooleanPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *BooleanElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// BooleanSampleReducer implements a reservoir sampling to calculate a random subset of points
type BooleanSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points booleanPoints // the reservoir
}
// NewBooleanSampleReducer creates a new BooleanSampleReducer
func NewBooleanSampleReducer(size int) *BooleanSampleReducer {
return &BooleanSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(booleanPoints, size),
}
}
// AggregateBoolean aggregates a point into the reducer.
func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *BooleanSampleReducer) Emit() []BooleanPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}