2017-08-15 19:24:22 +00:00
package query
2016-03-02 20:52:03 +00:00
2016-10-04 21:20:35 +00:00
import (
2021-02-08 13:38:14 +00:00
"encoding/binary"
"bytes"
2016-10-04 21:20:35 +00:00
"sort"
"time"
"math/rand"
2021-02-08 13:38:14 +00:00
"github.com/influxdata/influxdb/pkg/estimator/hll"
2016-10-04 21:20:35 +00:00
)
2016-04-07 21:37:48 +00:00
2016-03-02 23:42:00 +00:00
{{with $ types := .}}{{range $k := $ types }}
2016-03-02 20:52:03 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . Name }}PointAggregator aggregates points to produce a single point.
type {{ $ k . Name }}PointAggregator interface {
2016-03-07 18:25:45 +00:00
Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point)
}
// {{ $ k . Name }}BulkPointAggregator aggregates multiple points at a time.
type {{ $ k . Name }}BulkPointAggregator interface {
Aggregate{{ $ k . Name }}Bulk(points []{{ $ k . Name }}Point)
}
2016-03-02 23:42:00 +00:00
// {{ $ k . Name }}PointEmitter produces a single point from an aggregate.
type {{ $ k . Name }}PointEmitter interface {
2016-03-07 18:25:45 +00:00
Emit() []{{ $ k . Name }}Point
2016-03-02 20:52:03 +00:00
}
2016-03-02 23:42:00 +00:00
{{range $v := $ types }}
2016-03-02 20:52:03 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Func is the function called by a {{ $ k . Name }}Point reducer.
type {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Func func(prev *{{ $ v . Name }}Point, curr *{{ $ k . Name }}Point) (t int64, v {{ $ v . Type }}, aux []interface{})
2016-06-20 13:51:02 +00:00
// {{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
2016-03-02 23:42:00 +00:00
type {{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer struct {
prev *{{ $ v . Name }}Point
fn {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Func
2016-03-02 20:52:03 +00:00
}
2016-06-20 13:51:02 +00:00
// New{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer creates a new {{ $ k . Name }}Func{{ $ v . Name }}Reducer.
2016-06-01 21:52:47 +00:00
func New{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer(fn {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Func, prev *{{ $ v . Name }}Point) *{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer {
return &{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer{fn: fn, prev: prev}
2016-03-02 20:52:03 +00:00
}
2016-06-20 13:51:02 +00:00
// Aggregate{{ $ k . Name }} takes a {{ $ k . Name }}Point and invokes the reduce function with the
// current and new point to modify the current point.
2016-03-07 18:25:45 +00:00
func (r *{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
2016-03-02 20:52:03 +00:00
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
2016-03-02 23:42:00 +00:00
r.prev = &{{ $ v . Name }}Point{}
2016-03-02 20:52:03 +00:00
}
2016-03-07 18:25:45 +00:00
r.prev.Time = t
2016-03-02 20:52:03 +00:00
r.prev.Value = v
r.prev.Aux = aux
2016-03-07 18:25:45 +00:00
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
2016-06-20 13:51:02 +00:00
// Emit emits the point that was generated when reducing the points fed in with Aggregate{{ $ k . Name }}.
2016-03-07 18:25:45 +00:00
func (r *{{ $ k . Name }}Func{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer) Emit() []{{ $ v . Name }}Point {
return []{{ $ v . Name }}Point{*r.prev}
}
// {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}SliceFunc is the function called by a {{ $ k . Name }}Point reducer.
type {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}SliceFunc func(a []{{ $ k . Name }}Point) []{{ $ v . Name }}Point
2016-06-20 13:51:02 +00:00
// {{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
2016-03-07 18:25:45 +00:00
type {{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer struct {
points []{{ $ k . Name }}Point
fn {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}SliceFunc
}
2016-06-20 13:51:02 +00:00
// New{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer creates a new {{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer.
2016-03-07 18:25:45 +00:00
func New{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer(fn {{ $ k . Name }}Reduce{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}SliceFunc) *{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer {
return &{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer{fn: fn}
}
2016-06-20 13:51:02 +00:00
// Aggregate{{ $ k . Name }} copies the {{ $ k . Name }}Point into the internal slice to be passed
// to the reduce function when Emit is called.
2016-03-07 18:25:45 +00:00
func (r *{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
Update subqueries so groupings are propagated to inner queries
Previously, only time expressions got propagated inwards. The reason for
this was simple. If the outer query was going to filter to a specific
time range, then it would be unnecessary for the inner query to output
points within that time frame. It started as an optimization, but became
a feature because there was no reason to have the user repeat the same
time clause for the inner query as the outer query. So we allowed an
aggregate query with an interval to pass validation in the subquery if
the outer query had a time range. But `GROUP BY` clauses were not
propagated because that same logic didn't apply to them. It's not an
optimization there. So while grouping by a tag in the outer query
without grouping by it in the inner query was useless, there wasn't any
particular reason to care.
Then a bug was found where wildcards would propagate the dimensions
correctly, but the outer query containing a group by with the inner
query omitting it wouldn't correctly filter out the outer group by. We
could fix that filtering, but on further review, I had been seeing
people make that same mistake a lot. People seem to just believe that
the grouping should be propagated inwards. Instead of trying to fight
what the user wanted and explicitly erase groupings that weren't
propagated manually, we might as well just propagate them for the user
to make their lives easier. There is no useful situation where you would
want to group into buckets that can't physically exist so we might as
well do _something_ useful.
This will also now propagate time intervals to inner queries since the
same applies there. But, while the interval propagates, the following
query will not pass validation since it is still not possible to use a
grouping interval with a raw query (even if the inner query is an
aggregate):
SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m)
This also means wildcards will behave a bit differently. They will
retrieve dimensions from the sources in the inner query rather than just
using the dimensions in the group by.
Fixing top() and bottom() to return the correct auxiliary fields.
Unfortunately, we were not copying the buffer with the auxiliary fields
so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
r.points = append(r.points, *p.Clone())
2016-03-07 18:25:45 +00:00
}
2016-06-20 13:51:02 +00:00
// Aggregate{{ $ k . Name }}Bulk performs a bulk copy of {{ $ k . Name }}Points into the internal slice.
// This is a more efficient version of calling Aggregate{{ $ k . Name }} on each point.
2016-03-07 18:25:45 +00:00
func (r *{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer) Aggregate{{ $ k . Name }}Bulk(points []{{ $ k . Name }}Point) {
r.points = append(r.points, points...)
2016-03-02 20:52:03 +00:00
}
2016-06-20 13:51:02 +00:00
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
2016-03-07 18:25:45 +00:00
func (r *{{ $ k . Name }}SliceFunc{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}Reducer) Emit() []{{ $ v . Name }}Point {
return r.fn(r.points)
2016-03-02 20:52:03 +00:00
}
2016-04-07 21:37:48 +00:00
{{end}}
2021-02-08 13:38:14 +00:00
// {{ $ k . Name }}SumHllReducer returns the HLL sketch for a series, in string form
type {{ $ k . Name }}SumHllReducer struct {
plus *hll.Plus
}
// func New{{ $ k . Name }}SumHllReducer creates a new {{ $ k . Name }}SumHllReducer
func New{{ $ k . Name }}SumHllReducer() *{{ $ k . Name }}SumHllReducer {
return &{{ $ k . Name }}SumHllReducer{plus:hll.NewDefaultPlus()}
}
// Aggregate{{ $ k . Name }} aggregates a point into the reducer.
func (r *{{ $ k . Name }}SumHllReducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
{{if eq $ k . Type "string"}}
b := []byte(p.Value)
{{else}}
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
{{end}}
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *{{ $ k . Name }}SumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
2016-04-07 21:37:48 +00:00
// {{ $ k . Name }}DistinctReducer returns the distinct points in a series.
type {{ $ k . Name }}DistinctReducer struct {
m map[{{ $ k . Type }}]{{ $ k . Name }}Point
}
// New{{ $ k . Name }}DistinctReducer creates a new {{ $ k . Name }}DistinctReducer.
func New{{ $ k . Name }}DistinctReducer() *{{ $ k . Name }}DistinctReducer {
return &{{ $ k . Name }}DistinctReducer{m: make(map[{{ $ k . Type }}]{{ $ k . Name }}Point)}
}
// Aggregate{{ $ k . Name }} aggregates a point into the reducer.
func (r *{{ $ k . Name }}DistinctReducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *{{ $ k . Name }}DistinctReducer) Emit() []{{ $ k . Name }}Point {
points := make([]{{ $ k . Name }}Point, 0, len(r.m))
for _, p := range r.m {
points = append(points, {{ $ k . Name }}Point{Time: p.Time, Value: p.Value})
}
sort.Sort({{ $ k . name }}Points(points))
return points
}
2016-04-19 16:36:41 +00:00
// {{ $ k . Name }}ElapsedReducer calculates the elapsed of the aggregated points.
type {{ $ k . Name }}ElapsedReducer struct {
unitConversion int64
prev {{ $ k . Name }}Point
curr {{ $ k . Name }}Point
}
// New{{ $ k . Name }}ElapsedReducer creates a new {{ $ k . Name }}ElapsedReducer.
func New{{ $ k . Name }}ElapsedReducer(interval Interval) *{{ $ k . Name }}ElapsedReducer {
return &{{ $ k . Name }}ElapsedReducer{
unitConversion: int64(interval.Duration),
prev: {{ $ k . Name }}Point{Nil: true},
curr: {{ $ k . Name }}Point{Nil: true},
}
}
// Aggregate{{ $ k . Name }} aggregates a point into the reducer and updates the current window.
func (r *{{ $ k . Name }}ElapsedReducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *{{ $ k . Name }}ElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
2016-11-23 20:32:42 +00:00
// {{ $ k . Name }}SampleReducer implements a reservoir sampling to calculate a random subset of points
2016-10-04 21:20:35 +00:00
type {{ $ k . Name }}SampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points {{ $ k . name }}Points // the reservoir
}
// New{{ $ k . Name }}SampleReducer creates a new {{ $ k . Name }}SampleReducer
func New{{ $ k . Name }}SampleReducer(size int) *{{ $ k . Name }}SampleReducer {
return &{{ $ k . Name }}SampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make({{ $ k . name }}Points, size),
}
}
// Aggregate{{ $ k . Name }} aggregates a point into the reducer.
func (r *{{ $ k . Name }}SampleReducer) Aggregate{{ $ k . Name }}(p *{{ $ k . Name }}Point) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
2016-11-23 20:32:42 +00:00
p.CopyTo(&r.points[r.count-1])
2016-10-04 21:20:35 +00:00
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
2016-12-13 18:06:33 +00:00
rnd := r.rng.Intn(r.count)
2016-10-04 21:20:35 +00:00
if rnd < len(r.points) {
2016-11-23 20:32:42 +00:00
p.CopyTo(&r.points[rnd])
2016-10-04 21:20:35 +00:00
}
}
// Emit emits the reservoir sample as many points.
func (r *{{ $ k . Name }}SampleReducer) Emit() []{{ $ k . Name }}Point {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
2016-04-19 16:36:41 +00:00
2016-04-07 21:37:48 +00:00
{{end}}{{end}}