Merge pull request #2569 from influxdb/jw-1822

Add derivative functions
pull/2582/head
Jason Wilder 2015-05-14 16:33:18 -06:00
commit a5b8f2f924
8 changed files with 757 additions and 12 deletions

View File

@ -1,10 +1,17 @@
## v0.9.0-rc31 [unreleased]
### Features
- [#1822](https://github.com/influxdb/influxdb/issues/1822): Wire up DERIVATIVE aggregate
- [#1477](https://github.com/influxdb/influxdb/issues/1477): Wire up non_negative_derivative function
### Bugfixes
- [#2545](https://github.com/influxdb/influxdb/pull/2545): Use "value" as the field name for graphite input. Thanks @cannium.
- [#2558](https://github.com/influxdb/influxdb/pull/2558): Fix client response check - thanks @vladlopes!
- [2566](https://github.com/influxdb/influxdb/pull/2566): Wait until each data write has been commited by the Raft cluster.
## PRs
- [#2569](https://github.com/influxdb/influxdb/pull/2569): Add derivative functions
## v0.9.0-rc30 [2015-05-12]
### Release Notes

View File

@ -642,6 +642,31 @@ type SelectStatement struct {
FillValue interface{}
}
// HasDerivative returns true if one of the function calls in the statement is a
// derivative aggregate
func (s *SelectStatement) HasDerivative() bool {
for _, f := range s.FunctionCalls() {
if strings.HasSuffix(f.Name, "derivative") {
return true
}
}
return false
}
// IsSimpleDerivative return true if one of the function call is a derivative function with a
// variable ref as the first arg
func (s *SelectStatement) IsSimpleDerivative() bool {
for _, f := range s.FunctionCalls() {
if strings.HasSuffix(f.Name, "derivative") {
// it's nested if the first argument is an aggregate function
if _, ok := f.Args[0].(*VarRef); ok {
return true
}
}
}
return false
}
// Clone returns a deep copy of the statement.
func (s *SelectStatement) Clone() *SelectStatement {
clone := &SelectStatement{
@ -876,6 +901,51 @@ func (s *SelectStatement) Validate(tr targetRequirement) error {
}
}
if err := s.validateDerivative(); err != nil {
return err
}
return nil
}
func (s *SelectStatement) validateDerivative() error {
if !s.HasDerivative() {
return nil
}
// If a derivative is requested, it must be the only field in the query. We don't support
// multiple fields in combination w/ derivaties yet.
if len(s.Fields) != 1 {
return fmt.Errorf("derivative cannot be used with other fields")
}
aggr := s.FunctionCalls()
if len(aggr) != 1 {
return fmt.Errorf("derivative cannot be used with other fields")
}
// Derivative requires two arguments
derivativeCall := aggr[0]
if len(derivativeCall.Args) == 0 {
return fmt.Errorf("derivative requires a field argument")
}
// First arg must be a field or aggr over a field e.g. (mean(field))
_, callOk := derivativeCall.Args[0].(*Call)
_, varOk := derivativeCall.Args[0].(*VarRef)
if !(callOk || varOk) {
return fmt.Errorf("derivative requires a field argument")
}
// If a duration arg is pased, make sure it's a duration
if len(derivativeCall.Args) == 2 {
// Second must be a duration .e.g (1h)
if _, ok := derivativeCall.Args[1].(*DurationLiteral); !ok {
return fmt.Errorf("derivative requires a duration argument")
}
}
return nil
}

View File

@ -76,8 +76,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
}
defer m.Close()
// if it's a raw query we handle processing differently
if m.stmt.IsRawQuery {
// if it's a raw query or a non-nested derivative we handle processing differently
if m.stmt.IsRawQuery || m.stmt.IsSimpleDerivative() {
m.processRawQuery(out, filterEmptyResults)
return
}
@ -196,6 +196,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
// handle any fill options
resultValues = m.processFill(resultValues)
// process derivatives
resultValues = m.processDerivative(resultValues)
row := &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
@ -228,6 +231,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
valuesOffset := 0
valuesToReturn := make([]*rawQueryMapOutput, 0)
var lastValueFromPreviousChunk *rawQueryMapOutput
// loop until we've emptied out all the mappers and sent everything out
for {
// collect up to the limit for each mapper
@ -324,6 +328,10 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// hit the chunk size? Send out what has been accumulated, but keep
// processing.
if len(valuesToReturn) >= m.chunkSize {
lastValueFromPreviousChunk = valuesToReturn[len(valuesToReturn)-1]
valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn)
row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
@ -342,6 +350,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
out <- m.processRawResults(nil)
}
} else {
valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn)
row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
@ -349,6 +359,136 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
}
}
// derivativeInterval returns the time interval for the one (and only) derivative func
func (m *MapReduceJob) derivativeInterval() time.Duration {
if len(m.stmt.FunctionCalls()[0].Args) == 2 {
return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val
}
if m.stmt.groupByInterval > 0 {
return m.stmt.groupByInterval
}
return time.Second
}
func (m *MapReduceJob) isNonNegativeDerivative() bool {
return m.stmt.FunctionCalls()[0].Name == "non_negative_derivative"
}
func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput {
// If we're called and do not have a derivative aggregate function, then return what was passed in
if !m.stmt.HasDerivative() {
return valuesToReturn
}
if len(valuesToReturn) == 0 {
return valuesToReturn
}
// If we only have 1 value, then the value did not change, so return
// a single row with 0.0
if len(valuesToReturn) == 1 {
return []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: valuesToReturn[0].Time,
Values: 0.0,
},
}
}
if lastValueFromPreviousChunk == nil {
lastValueFromPreviousChunk = valuesToReturn[0]
}
// Determines whether to drop negative differences
isNonNegative := m.isNonNegativeDerivative()
derivativeValues := []*rawQueryMapOutput{}
for i := 1; i < len(valuesToReturn); i++ {
v := valuesToReturn[i]
// Calculate the derivative of successive points by dividing the difference
// of each value by the elapsed time normalized to the interval
diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64)
elapsed := v.Time - lastValueFromPreviousChunk.Time
value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(m.derivativeInterval()))
}
lastValueFromPreviousChunk = v
// Drop negative values for non-negative derivatives
if isNonNegative && diff < 0 {
continue
}
derivativeValues = append(derivativeValues, &rawQueryMapOutput{
Time: v.Time,
Values: value,
})
}
return derivativeValues
}
// processDerivative returns the derivatives of the results
func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} {
// Return early if we're not supposed to process the derivatives
if !m.stmt.HasDerivative() {
return results
}
// Return early if we can't calculate derivatives
if len(results) == 0 {
return results
}
// If we only have 1 value, then the value did not change, so return
// a single row w/ 0.0
if len(results) == 1 {
return [][]interface{}{
[]interface{}{results[0][0], 0.0},
}
}
// Determines whether to drop negative differences
isNonNegative := m.isNonNegativeDerivative()
// Otherwise calculate the derivatives as the difference between consecutive
// points divided by the elapsed time. Then normalize to the requested
// interval.
derivatives := [][]interface{}{}
for i := 1; i < len(results); i++ {
prev := results[i-1]
cur := results[i]
if cur[1] == nil || prev[1] == nil {
continue
}
elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time))
diff := cur[1].(float64) - prev[1].(float64)
value := 0.0
if elapsed > 0 {
value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval()))
}
// Drop negative values for non-negative derivatives
if isNonNegative && diff < 0 {
continue
}
val := []interface{}{
cur[0],
value,
}
derivatives = append(derivatives, val)
}
return derivatives
}
// processsResults will apply any math that was specified in the select statement against the passed in results
func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} {
hasMath := false

423
influxql/engine_test.go Normal file
View File

@ -0,0 +1,423 @@
package influxql
import (
"fmt"
"math"
"testing"
"time"
)
func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob {
if interval != "" {
interval = ", " + interval
}
q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value)%s) FROM foo", fn, interval))
if err != nil {
t.Fatalf("failed to parse query: %s", err)
}
m := &MapReduceJob{
stmt: q.Statements[0].(*SelectStatement),
}
return m
}
// TestProccessDerivative tests the processDerivative transformation function on the engine.
// The is called for a query with a group by.
func TestProcessDerivative(t *testing.T) {
tests := []struct {
name string
fn string
interval string
in [][]interface{}
exp [][]interface{}
}{
{
name: "empty input",
fn: "derivative",
interval: "1d",
in: [][]interface{}{},
exp: [][]interface{}{},
},
{
name: "single row returns 0.0",
fn: "derivative",
interval: "1d",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 0.0,
},
},
},
{
name: "derivative normalized to 1s by default",
fn: "derivative",
interval: "",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 3.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 5.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 9.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 2.0 / 86400,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 2.0 / 86400,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0 / 86400,
},
},
},
{
name: "basic derivative",
fn: "derivative",
interval: "1d",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 3.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 5.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 9.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 2.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
},
{
name: "12h interval",
fn: "derivative",
interval: "12h",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 3.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 0.5,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 0.5,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 0.5,
},
},
},
{
name: "negative derivatives",
fn: "derivative",
interval: "1d",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 0.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), -2.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
},
{
name: "negative derivatives",
fn: "non_negative_derivative",
interval: "1d",
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
},
// Show resultes in negative derivative
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), 0.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), 1.0,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
},
},
},
}
for _, test := range tests {
m := derivativeJob(t, test.fn, test.interval)
got := m.processDerivative(test.in)
if len(got) != len(test.exp) {
t.Fatalf("processDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp))
}
for i := 0; i < len(test.exp); i++ {
if test.exp[i][0] != got[i][0] || test.exp[i][1] != got[i][1] {
t.Fatalf("processDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
}
}
}
}
// TestProcessRawQueryDerivative tests the processRawQueryDerivative transformation function on the engine.
// The is called for a queries that do not have a group by.
func TestProcessRawQueryDerivative(t *testing.T) {
tests := []struct {
name string
fn string
interval string
in []*rawQueryMapOutput
exp []*rawQueryMapOutput
}{
{
name: "empty input",
fn: "derivative",
interval: "1d",
in: []*rawQueryMapOutput{},
exp: []*rawQueryMapOutput{},
},
{
name: "single row returns 0.0",
fn: "derivative",
interval: "1d",
in: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Unix(),
Values: 1.0,
},
},
exp: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Unix(),
Values: 0.0,
},
},
},
{
name: "basic derivative",
fn: "derivative",
interval: "1d",
in: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Unix(),
Values: 0.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 3.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 5.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 9.0,
},
},
exp: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 3.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 2.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
},
{
name: "12h interval",
fn: "derivative",
interval: "12h",
in: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).UnixNano(),
Values: 1.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 2.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 3.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
exp: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 0.5,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 0.5,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 0.5,
},
},
},
{
name: "negative derivatives",
fn: "derivative",
interval: "1d",
in: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Unix(),
Values: 1.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 2.0,
},
// should go negative
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 0.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
exp: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 1.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: -2.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
},
{
name: "negative derivatives",
fn: "non_negative_derivative",
interval: "1d",
in: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Unix(),
Values: 1.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 2.0,
},
// should go negative
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Values: 0.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
exp: []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Values: 1.0,
},
&rawQueryMapOutput{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Values: 4.0,
},
},
},
}
for _, test := range tests {
m := derivativeJob(t, test.fn, test.interval)
got := m.processRawQueryDerivative(nil, test.in)
if len(got) != len(test.exp) {
t.Fatalf("processRawQueryDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp))
}
for i := 0; i < len(test.exp); i++ {
if test.exp[i].Time != got[i].Time || math.Abs((test.exp[i].Values.(float64)-got[i].Values.(float64))) > 0.0000001 {
t.Fatalf("processRawQueryDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
}
}
}
}

View File

@ -12,6 +12,7 @@ import (
"math"
"math/rand"
"sort"
"strings"
)
// Iterator represents a forward-only iterator over a set of points.
@ -41,16 +42,25 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
// Ensure that there is either a single argument or if for percentile, two
if c.Name == "percentile" {
if len(c.Args) != 2 {
return nil, fmt.Errorf("expected two arguments for percentile()")
return nil, fmt.Errorf("expected two arguments for %s()", c.Name)
}
} else if strings.HasSuffix(c.Name, "derivative") {
// derivatives require a field name and optional duration
if len(c.Args) == 0 {
return nil, fmt.Errorf("expected field name argument for %s()", c.Name)
}
} else if len(c.Args) != 1 {
return nil, fmt.Errorf("expected one argument for %s()", c.Name)
}
// Ensure the argument is a variable reference.
_, ok := c.Args[0].(*VarRef)
if !ok {
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
// derivative can take a nested aggregate function, everything else expects
// a variable reference as the first arg
if !strings.HasSuffix(c.Name, "derivative") {
// Ensure the argument is a variable reference.
_, ok := c.Args[0].(*VarRef)
if !ok {
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
}
}
// Retrieve map function by name.
@ -81,6 +91,13 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
return nil, fmt.Errorf("expected float argument in percentile()")
}
return MapEcho, nil
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate
if fn, ok := c.Args[0].(*Call); ok {
return InitializeMapFunc(fn)
}
return MapRawQuery, nil
default:
return nil, fmt.Errorf("function not found: %q", c.Name)
}
@ -120,6 +137,13 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
return nil, fmt.Errorf("expected float argument in percentile()")
}
return ReducePercentile(lit.Val), nil
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate
if fn, ok := c.Args[0].(*Call); ok {
return InitializeReduceFunc(fn)
}
return nil, fmt.Errorf("expected function argument to %s", c.Name)
default:
return nil, fmt.Errorf("function not found: %q", c.Name)
}
@ -774,6 +798,10 @@ type rawQueryMapOutput struct {
Values interface{}
}
func (r *rawQueryMapOutput) String() string {
return fmt.Sprintf("{%#v %#v}", r.Time, r.Values)
}
type rawOutputs []*rawQueryMapOutput
func (a rawOutputs) Len() int { return len(a) }

View File

@ -1,6 +1,10 @@
package influxql
import "testing"
import (
"testing"
"time"
)
import "sort"
type point struct {
@ -99,6 +103,50 @@ func TestInitializeMapFuncPercentile(t *testing.T) {
}
}
func TestInitializeMapFuncDerivative(t *testing.T) {
for _, fn := range []string{"derivative", "non_negative_derivative"} {
// No args should fail
c := &Call{
Name: fn,
Args: []Expr{},
}
_, err := InitializeMapFunc(c)
if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
}
// Single field arg should return MapEcho
c = &Call{
Name: fn,
Args: []Expr{
&VarRef{Val: " field1"},
&DurationLiteral{Val: time.Hour},
},
}
_, err = InitializeMapFunc(c)
if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
}
// Nested Aggregate func should return the map func for the nested aggregate
c = &Call{
Name: fn,
Args: []Expr{
&Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
&DurationLiteral{Val: time.Hour},
},
}
_, err = InitializeMapFunc(c)
if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
}
}
}
func TestInitializeReduceFuncPercentile(t *testing.T) {
// No args
c := &Call{

View File

@ -106,6 +106,18 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SELECT statement
{
s: `SELECT derivative(field1, 1h) FROM myseries;`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "derivative", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.DurationLiteral{Val: time.Hour}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},
// SELECT statement (lowercase)
{
s: `select my_field from myseries`,
@ -1001,6 +1013,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM 12`, err: `found 12, expected identifier at line 1, char 20`},
{s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`},
{s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`},
{s: `SELECT derivative(field1), field1 FROM myseries`, err: `derivative cannot be used with other fields`},
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},

24
tx.go
View File

@ -93,13 +93,20 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
}
}
// If a numerical aggregate is requested, ensure it is only performed on numeric data.
// If a numerical aggregate is requested, ensure it is only performed on numeric data or on a
// nested aggregate on numeric data.
for _, a := range stmt.FunctionCalls() {
lit, ok := a.Args[0].(*influxql.VarRef)
// Check for fields like `derivative(mean(value), 1d)`
var nested *influxql.Call = a
if fn, ok := nested.Args[0].(*influxql.Call); ok {
nested = fn
}
lit, ok := nested.Args[0].(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("aggregate call didn't contain a field %s", a.String())
}
if influxql.IsNumeric(a) {
if influxql.IsNumeric(nested) {
f := m.FieldByName(lit.Val)
if f.Type != influxql.Float && f.Type != influxql.Integer {
return nil, fmt.Errorf("aggregate '%s' requires numerical field values. Field '%s' is of type %s",
@ -348,7 +355,16 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int)
l.limit = math.MaxUint64
}
} else {
lit, _ := c.Args[0].(*influxql.VarRef)
// Check for calls like `derivative(mean(value), 1d)`
var nested *influxql.Call = c
if fn, ok := c.Args[0].(*influxql.Call); ok {
nested = fn
}
lit, ok := nested.Args[0].(*influxql.VarRef)
if !ok {
return fmt.Errorf("aggregate call didn't contain a field %s", c.String())
}
fieldName = lit.Val
}