Merge pull request #6113 from influxdata/js-6112-simple-moving-average

Implement simple moving average
pull/6167/head
Jonathan A. Sternberg 2016-03-30 20:57:55 -04:00
commit 178a6e2f0a
10 changed files with 1922 additions and 5 deletions

View File

@ -15,6 +15,7 @@
- [#6116](https://github.com/influxdata/influxdb/pull/6116): Allow `httpd` service to be extensible for routes
- [#6111](https://github.com/influxdata/influxdb/pull/6111): Add ability to build static assest. Improved handling of TAR and ZIP package outputs.
- [#1825](https://github.com/influxdata/influxdb/issues/1825): Implement difference function.
- [#6112](https://github.com/influxdata/influxdb/issues/6112): Implement simple moving average function.
- [#6149](https://github.com/influxdata/influxdb/pull/6149): Kill running queries when server is shutdown.
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.

View File

@ -2073,6 +2073,215 @@ cpu value=20 1278010021000000000
}
}
// Ensure the server can handle various group by time moving average queries.
func TestServer_Query_SelectGroupByTimeMovingAverage(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=15 1278010021000000000
cpu value=20 1278010022000000000
cpu value=25 1278010023000000000
cpu value=30 1278010024000000000
cpu value=35 1278010025000000000
`)},
}
test.addQueries([]*Query{
&Query{
name: "calculate moving average of count",
command: `SELECT moving_average(count(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",2],["2010-07-01T18:47:04Z",2]]}]}]}`,
},
&Query{
name: "calculate moving average of mean",
command: `SELECT moving_average(mean(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",17.5],["2010-07-01T18:47:04Z",27.5]]}]}]}`,
},
&Query{
name: "calculate moving average of median",
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",17.5],["2010-07-01T18:47:04Z",27.5]]}]}]}`,
},
&Query{
name: "calculate moving average of sum",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",35],["2010-07-01T18:47:04Z",55]]}]}]}`,
},
&Query{
name: "calculate moving average of first",
command: `SELECT moving_average(first(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of last",
command: `SELECT moving_average(last(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",20],["2010-07-01T18:47:04Z",30]]}]}]}`,
},
&Query{
name: "calculate moving average of min",
command: `SELECT moving_average(min(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of max",
command: `SELECT moving_average(max(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",20],["2010-07-01T18:47:04Z",30]]}]}]}`,
},
&Query{
name: "calculate moving average of percentile",
command: `SELECT moving_average(percentile(value, 50), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Ensure the server can handle various group by time moving average queries.
func TestServer_Query_SelectGroupByTimeMovingAverageWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=15 1278010021000000000
cpu value=30 1278010024000000000
cpu value=35 1278010025000000000
`)},
}
test.addQueries([]*Query{
&Query{
name: "calculate moving average of count with fill 0",
command: `SELECT moving_average(count(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",1],["2010-07-01T18:47:04Z",1]]}]}]}`,
},
&Query{
name: "calculate moving average of count with fill previous",
command: `SELECT moving_average(count(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",2],["2010-07-01T18:47:04Z",2]]}]}]}`,
},
&Query{
name: "calculate moving average of mean with fill 0",
command: `SELECT moving_average(mean(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`,
},
&Query{
name: "calculate moving average of mean with fill previous",
command: `SELECT moving_average(mean(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",22.5]]}]}]}`,
},
&Query{
name: "calculate moving average of median with fill 0",
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`,
},
&Query{
name: "calculate moving average of median with fill previous",
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",22.5]]}]}]}`,
},
&Query{
name: "calculate moving average of sum with fill 0",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",32.5]]}]}]}`,
},
&Query{
name: "calculate moving average of sum with fill previous",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",25],["2010-07-01T18:47:04Z",45]]}]}]}`,
},
&Query{
name: "calculate moving average of first with fill 0",
command: `SELECT moving_average(first(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`,
},
&Query{
name: "calculate moving average of first with fill previous",
command: `SELECT moving_average(first(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`,
},
&Query{
name: "calculate moving average of last with fill 0",
command: `SELECT moving_average(last(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`,
},
&Query{
name: "calculate moving average of last with fill previous",
command: `SELECT moving_average(last(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of min with fill 0",
command: `SELECT moving_average(min(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`,
},
&Query{
name: "calculate moving average of min with fill previous",
command: `SELECT moving_average(min(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`,
},
&Query{
name: "calculate moving average of max with fill 0",
command: `SELECT moving_average(max(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`,
},
&Query{
name: "calculate moving average of max with fill previous",
command: `SELECT moving_average(max(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of percentile with fill 0",
command: `SELECT moving_average(percentile(value, 50), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`,
},
&Query{
name: "calculate moving average of percentile with fill previous",
command: `SELECT moving_average(percentile(value, 50), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// mergeMany ensures that when merging many series together and some of them have a different number
// of points than others in a group by interval the results are correct
func TestServer_Query_MergeMany(t *testing.T) {

View File

@ -1476,7 +1476,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
switch expr.Name {
case "derivative", "non_negative_derivative", "difference":
case "derivative", "non_negative_derivative", "difference", "moving_average":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
@ -1489,6 +1489,18 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
if got := len(expr.Args); got != 1 {
return fmt.Errorf("invalid number of arguments for difference, expected 1, got %d", got)
}
case "moving_average":
if got := len(expr.Args); got != 2 {
return fmt.Errorf("invalid number of arguments for moving_average, expected 2, got %d", got)
}
if lit, ok := expr.Args[1].(*IntegerLiteral); !ok {
return fmt.Errorf("second argument for moving_average must be an integer, got %T", expr.Args[1])
} else if lit.Val <= 1 {
return fmt.Errorf("moving_average window must be greater than 1, got %d", lit.Val)
} else if int64(int(lit.Val)) != lit.Val {
return fmt.Errorf("moving_average window too large, got %d", lit.Val)
}
}
// Validate that if they have grouping by time, they need a sub-call like min/max, etc.
groupByInterval, err := s.GroupByInterval()

View File

@ -1180,3 +1180,23 @@ func IntegerDifferenceReduceSlice(a []IntegerPoint) []IntegerPoint {
}
return output
}
// newMovingAverageIterator returns an iterator for operating on a moving_average() call.
func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatMovingAverageReducer(n)
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerMovingAverageReducer(n)
return fn, fn
}
return newIntegerStreamFloatIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported moving average iterator type: %T", input)
}
}

View File

@ -1,14 +1,17 @@
package influxql
// FloatMeanReducer calculates the mean of the aggregated points.
type FloatMeanReducer struct {
sum float64
count uint32
}
// NewFloatMeanReducer creates a new FloatMeanReducer.
func NewFloatMeanReducer() *FloatMeanReducer {
return &FloatMeanReducer{}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatMeanReducer) AggregateFloat(p *FloatPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * float64(p.Aggregated)
@ -19,6 +22,7 @@ func (r *FloatMeanReducer) AggregateFloat(p *FloatPoint) {
}
}
// Emit emits the mean of the aggregated points as a single point.
func (r *FloatMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
@ -27,15 +31,18 @@ func (r *FloatMeanReducer) Emit() []FloatPoint {
}}
}
// IntegerMeanReducer calculates the mean of the aggregated points.
type IntegerMeanReducer struct {
sum int64
count uint32
}
// NewIntegerMeanReducer creates a new IntegerMeanReducer.
func NewIntegerMeanReducer() *IntegerMeanReducer {
return &IntegerMeanReducer{}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerMeanReducer) AggregateInteger(p *IntegerPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * int64(p.Aggregated)
@ -46,6 +53,7 @@ func (r *IntegerMeanReducer) AggregateInteger(p *IntegerPoint) {
}
}
// Emit emits the mean of the aggregated points as a single point.
func (r *IntegerMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
@ -53,3 +61,97 @@ func (r *IntegerMeanReducer) Emit() []FloatPoint {
Aggregated: r.count,
}}
}
// FloatMovingAverageReducer calculates the moving average of the aggregated points.
type FloatMovingAverageReducer struct {
pos int
sum float64
time int64
buf []float64
}
// NewFloatMovingAverageReducer creates a new FloatMovingAverageReducer.
func NewFloatMovingAverageReducer(n int) *FloatMovingAverageReducer {
return &FloatMovingAverageReducer{
buf: make([]float64, 0, n),
}
}
// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatMovingAverageReducer) AggregateFloat(p *FloatPoint) {
if len(r.buf) != cap(r.buf) {
r.buf = append(r.buf, p.Value)
} else {
r.sum -= r.buf[r.pos]
r.buf[r.pos] = p.Value
}
r.sum += p.Value
r.time = p.Time
r.pos++
if r.pos >= cap(r.buf) {
r.pos = 0
}
}
// Emit emits the moving average of the current window. Emit should be called
// after every call to AggregateFloat and it will produce one point if there
// is enough data to fill a window, otherwise it will produce zero points.
func (r *FloatMovingAverageReducer) Emit() []FloatPoint {
if len(r.buf) != cap(r.buf) {
return []FloatPoint{}
}
return []FloatPoint{
{
Value: r.sum / float64(len(r.buf)),
Time: r.time,
Aggregated: uint32(len(r.buf)),
},
}
}
// IntegerMovingAverageReducer calculates the moving average of the aggregated points.
type IntegerMovingAverageReducer struct {
pos int
sum int64
time int64
buf []int64
}
// NewIntegerMovingAverageReducer creates a new IntegerMovingAverageReducer.
func NewIntegerMovingAverageReducer(n int) *IntegerMovingAverageReducer {
return &IntegerMovingAverageReducer{
buf: make([]int64, 0, n),
}
}
// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerMovingAverageReducer) AggregateInteger(p *IntegerPoint) {
if len(r.buf) != cap(r.buf) {
r.buf = append(r.buf, p.Value)
} else {
r.sum -= r.buf[r.pos]
r.buf[r.pos] = p.Value
}
r.sum += p.Value
r.time = p.Time
r.pos++
if r.pos >= cap(r.buf) {
r.pos = 0
}
}
// Emit emits the moving average of the current window. Emit should be called
// after every call to AggregateInteger and it will produce one point if there
// is enough data to fill a window, otherwise it will produce zero points.
func (r *IntegerMovingAverageReducer) Emit() []FloatPoint {
if len(r.buf) != cap(r.buf) {
return []FloatPoint{}
}
return []FloatPoint{
{
Value: float64(r.sum) / float64(len(r.buf)),
Time: r.time,
Aggregated: uint32(len(r.buf)),
},
}
}

File diff suppressed because it is too large Load Diff

View File

@ -811,7 +811,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{$v.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
@ -841,7 +841,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]*{{$k.name}}Reduce{{.Name}}Point)
m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
@ -851,7 +851,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
@ -897,6 +897,92 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
return a
}
// {{$k.name}}Stream{{$v.Name}}Iterator
type {{$k.name}}Stream{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
opt IteratorOptions
m map[string]*{{$k.name}}Reduce{{$v.Name}}Point
points []{{$v.Name}}Point
}
func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator {
return &{{$k.name}}Stream{{$v.Name}}Iterator{
input: newBuf{{$k.Name}}Iterator(input),
create: createFn,
opt: opt,
m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point),
}
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the next value for the stream iterator.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() *{{$v.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := &itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce creates and manages aggregators for every point from the input.
// After aggregating a point, it always tries to emit a value using the emitter.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
for {
// Read next point.
curr := itr.input.Next()
if curr == nil {
return nil
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := itr.m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{$k.name}}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
itr.m[id] = rp
}
rp.Aggregator.Aggregate{{$k.Name}}(curr)
// Attempt to emit points from the aggregator.
points := rp.Emitter.Emit()
if len(points) == 0 {
continue
}
for i := range points {
points[i].Name = rp.Name
points[i].Tags = rp.Tags
}
return points
}
}
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator executes a function to modify an existing point
// for every output of the input iterator.
type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator struct {

View File

@ -247,6 +247,57 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// moving_average
{
s: `SELECT moving_average(field1, 3) FROM myseries;`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "moving_average", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.IntegerLiteral{Val: 3}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},
{
s: fmt.Sprintf(`SELECT moving_average(max(field1), 3) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{
Expr: &influxql.Call{
Name: "moving_average",
Args: []influxql.Expr{
&influxql.Call{
Name: "max",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
},
},
&influxql.IntegerLiteral{Val: 3},
},
},
},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: time.Minute},
},
},
},
},
Condition: &influxql.BinaryExpr{
Op: influxql.GT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: now.UTC()},
},
},
},
// SELECT statement (lowercase)
{
s: `select my_field from myseries`,
@ -1828,6 +1879,15 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`},
{s: `SELECT moving_average(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT moving_average() from myseries`, err: `invalid number of arguments for moving_average, expected 2, got 0`},
{s: `SELECT moving_average(value) FROM myseries`, err: `invalid number of arguments for moving_average, expected 2, got 1`},
{s: `SELECT moving_average(value, 2) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to moving_average`},
{s: `SELECT moving_average(top(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`},
{s: `SELECT moving_average(bottom(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
{s: `SELECT moving_average(max(), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT moving_average(percentile(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT moving_average(mean(value), 2) FROM myseries where time < now() and time > now() - 1d`, err: `moving_average aggregate requires a GROUP BY interval`},
{s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`},
{s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
{s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},

View File

@ -223,7 +223,7 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter
return nil, err
}
return NewIntervalIterator(input, opt), nil
case "derivative", "non_negative_derivative", "difference":
case "derivative", "non_negative_derivative", "difference", "moving_average":
input, err := buildExprIterator(expr.Args[0], ic, opt)
if err != nil {
return nil, err
@ -236,6 +236,9 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter
return newDerivativeIterator(input, opt, interval, isNonNegative)
case "difference":
return newDifferenceIterator(input, opt)
case "moving_average":
n := expr.Args[1].(*IntegerLiteral)
return newMovingAverageIterator(input, int(n.Val), opt)
}
panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))
default:

View File

@ -1943,6 +1943,54 @@ func TestSelect_Difference_Integer(t *testing.T) {
}
}
func TestSelect_MovingAverage_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT moving_average(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 15, Aggregated: 2}},
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 14.5, Aggregated: 2}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 11, Aggregated: 2}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_MovingAverage_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT moving_average(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 15, Aggregated: 2}},
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 14.5, Aggregated: 2}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 11, Aggregated: 2}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_UnsupportedCall(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {