diff --git a/CHANGELOG.md b/CHANGELOG.md index 3005f31953..7de3ed7673 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,17 @@ ## v0.9.0-rc31 [unreleased] +### Features +- [#1822](https://github.com/influxdb/influxdb/issues/1822): Wire up DERIVATIVE aggregate +- [#1477](https://github.com/influxdb/influxdb/issues/1477): Wire up non_negative_derivative function + ### Bugfixes - [#2545](https://github.com/influxdb/influxdb/pull/2545): Use "value" as the field name for graphite input. Thanks @cannium. - [#2558](https://github.com/influxdb/influxdb/pull/2558): Fix client response check - thanks @vladlopes! - [2566](https://github.com/influxdb/influxdb/pull/2566): Wait until each data write has been commited by the Raft cluster. +## PRs +- [#2569](https://github.com/influxdb/influxdb/pull/2569): Add derivative functions + ## v0.9.0-rc30 [2015-05-12] ### Release Notes diff --git a/influxql/ast.go b/influxql/ast.go index 316562ae9f..35e3bcc210 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -642,6 +642,31 @@ type SelectStatement struct { FillValue interface{} } +// HasDerivative returns true if one of the function calls in the statement is a +// derivative aggregate +func (s *SelectStatement) HasDerivative() bool { + for _, f := range s.FunctionCalls() { + if strings.HasSuffix(f.Name, "derivative") { + return true + } + } + return false +} + +// IsSimpleDerivative return true if one of the function call is a derivative function with a +// variable ref as the first arg +func (s *SelectStatement) IsSimpleDerivative() bool { + for _, f := range s.FunctionCalls() { + if strings.HasSuffix(f.Name, "derivative") { + // it's nested if the first argument is an aggregate function + if _, ok := f.Args[0].(*VarRef); ok { + return true + } + } + } + return false +} + // Clone returns a deep copy of the statement. func (s *SelectStatement) Clone() *SelectStatement { clone := &SelectStatement{ @@ -876,6 +901,51 @@ func (s *SelectStatement) Validate(tr targetRequirement) error { } } + if err := s.validateDerivative(); err != nil { + return err + } + + return nil +} + +func (s *SelectStatement) validateDerivative() error { + if !s.HasDerivative() { + return nil + } + + // If a derivative is requested, it must be the only field in the query. We don't support + // multiple fields in combination w/ derivaties yet. + if len(s.Fields) != 1 { + return fmt.Errorf("derivative cannot be used with other fields") + } + + aggr := s.FunctionCalls() + if len(aggr) != 1 { + return fmt.Errorf("derivative cannot be used with other fields") + } + + // Derivative requires two arguments + derivativeCall := aggr[0] + if len(derivativeCall.Args) == 0 { + return fmt.Errorf("derivative requires a field argument") + } + + // First arg must be a field or aggr over a field e.g. (mean(field)) + _, callOk := derivativeCall.Args[0].(*Call) + _, varOk := derivativeCall.Args[0].(*VarRef) + + if !(callOk || varOk) { + return fmt.Errorf("derivative requires a field argument") + } + + // If a duration arg is pased, make sure it's a duration + if len(derivativeCall.Args) == 2 { + // Second must be a duration .e.g (1h) + if _, ok := derivativeCall.Args[1].(*DurationLiteral); !ok { + return fmt.Errorf("derivative requires a duration argument") + } + } + return nil } diff --git a/influxql/engine.go b/influxql/engine.go index 0764cd6d47..69e4e6fe61 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -76,8 +76,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { } defer m.Close() - // if it's a raw query we handle processing differently - if m.stmt.IsRawQuery { + // if it's a raw query or a non-nested derivative we handle processing differently + if m.stmt.IsRawQuery || m.stmt.IsSimpleDerivative() { m.processRawQuery(out, filterEmptyResults) return } @@ -196,6 +196,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { // handle any fill options resultValues = m.processFill(resultValues) + // process derivatives + resultValues = m.processDerivative(resultValues) + row := &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, @@ -228,6 +231,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { valuesOffset := 0 valuesToReturn := make([]*rawQueryMapOutput, 0) + var lastValueFromPreviousChunk *rawQueryMapOutput // loop until we've emptied out all the mappers and sent everything out for { // collect up to the limit for each mapper @@ -324,6 +328,10 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // hit the chunk size? Send out what has been accumulated, but keep // processing. if len(valuesToReturn) >= m.chunkSize { + lastValueFromPreviousChunk = valuesToReturn[len(valuesToReturn)-1] + + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -342,6 +350,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { out <- m.processRawResults(nil) } } else { + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -349,6 +359,136 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { } } +// derivativeInterval returns the time interval for the one (and only) derivative func +func (m *MapReduceJob) derivativeInterval() time.Duration { + if len(m.stmt.FunctionCalls()[0].Args) == 2 { + return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val + } + if m.stmt.groupByInterval > 0 { + return m.stmt.groupByInterval + } + return time.Second +} + +func (m *MapReduceJob) isNonNegativeDerivative() bool { + return m.stmt.FunctionCalls()[0].Name == "non_negative_derivative" +} + +func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput { + // If we're called and do not have a derivative aggregate function, then return what was passed in + if !m.stmt.HasDerivative() { + return valuesToReturn + } + + if len(valuesToReturn) == 0 { + return valuesToReturn + } + + // If we only have 1 value, then the value did not change, so return + // a single row with 0.0 + if len(valuesToReturn) == 1 { + return []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: valuesToReturn[0].Time, + Values: 0.0, + }, + } + } + + if lastValueFromPreviousChunk == nil { + lastValueFromPreviousChunk = valuesToReturn[0] + } + + // Determines whether to drop negative differences + isNonNegative := m.isNonNegativeDerivative() + + derivativeValues := []*rawQueryMapOutput{} + for i := 1; i < len(valuesToReturn); i++ { + v := valuesToReturn[i] + + // Calculate the derivative of successive points by dividing the difference + // of each value by the elapsed time normalized to the interval + diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64) + elapsed := v.Time - lastValueFromPreviousChunk.Time + + value := 0.0 + if elapsed > 0 { + value = diff / (float64(elapsed) / float64(m.derivativeInterval())) + } + + lastValueFromPreviousChunk = v + + // Drop negative values for non-negative derivatives + if isNonNegative && diff < 0 { + continue + } + + derivativeValues = append(derivativeValues, &rawQueryMapOutput{ + Time: v.Time, + Values: value, + }) + } + + return derivativeValues +} + +// processDerivative returns the derivatives of the results +func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} { + // Return early if we're not supposed to process the derivatives + if !m.stmt.HasDerivative() { + return results + } + + // Return early if we can't calculate derivatives + if len(results) == 0 { + return results + } + + // If we only have 1 value, then the value did not change, so return + // a single row w/ 0.0 + if len(results) == 1 { + return [][]interface{}{ + []interface{}{results[0][0], 0.0}, + } + } + + // Determines whether to drop negative differences + isNonNegative := m.isNonNegativeDerivative() + + // Otherwise calculate the derivatives as the difference between consecutive + // points divided by the elapsed time. Then normalize to the requested + // interval. + derivatives := [][]interface{}{} + for i := 1; i < len(results); i++ { + prev := results[i-1] + cur := results[i] + + if cur[1] == nil || prev[1] == nil { + continue + } + + elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time)) + diff := cur[1].(float64) - prev[1].(float64) + value := 0.0 + if elapsed > 0 { + value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())) + } + + // Drop negative values for non-negative derivatives + if isNonNegative && diff < 0 { + continue + } + + val := []interface{}{ + cur[0], + value, + } + derivatives = append(derivatives, val) + } + + return derivatives +} + // processsResults will apply any math that was specified in the select statement against the passed in results func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} { hasMath := false diff --git a/influxql/engine_test.go b/influxql/engine_test.go new file mode 100644 index 0000000000..86aaba69ef --- /dev/null +++ b/influxql/engine_test.go @@ -0,0 +1,423 @@ +package influxql + +import ( + "fmt" + "math" + "testing" + "time" +) + +func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob { + + if interval != "" { + interval = ", " + interval + } + + q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value)%s) FROM foo", fn, interval)) + if err != nil { + t.Fatalf("failed to parse query: %s", err) + } + m := &MapReduceJob{ + stmt: q.Statements[0].(*SelectStatement), + } + + return m +} + +// TestProccessDerivative tests the processDerivative transformation function on the engine. +// The is called for a query with a group by. +func TestProcessDerivative(t *testing.T) { + tests := []struct { + name string + fn string + interval string + in [][]interface{} + exp [][]interface{} + }{ + { + name: "empty input", + fn: "derivative", + interval: "1d", + in: [][]interface{}{}, + exp: [][]interface{}{}, + }, + + { + name: "single row returns 0.0", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 0.0, + }, + }, + }, + { + name: "derivative normalized to 1s by default", + fn: "derivative", + interval: "", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 5.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 9.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0 / 86400, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 2.0 / 86400, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0 / 86400, + }, + }, + }, + { + name: "basic derivative", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 5.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 9.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + { + name: "12h interval", + fn: "derivative", + interval: "12h", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 0.5, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.5, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 0.5, + }, + }, + }, + { + name: "negative derivatives", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), -2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + { + name: "negative derivatives", + fn: "non_negative_derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + // Show resultes in negative derivative + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + } + + for _, test := range tests { + m := derivativeJob(t, test.fn, test.interval) + got := m.processDerivative(test.in) + + if len(got) != len(test.exp) { + t.Fatalf("processDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp)) + } + + for i := 0; i < len(test.exp); i++ { + if test.exp[i][0] != got[i][0] || test.exp[i][1] != got[i][1] { + t.Fatalf("processDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp) + } + } + } +} + +// TestProcessRawQueryDerivative tests the processRawQueryDerivative transformation function on the engine. +// The is called for a queries that do not have a group by. +func TestProcessRawQueryDerivative(t *testing.T) { + tests := []struct { + name string + fn string + interval string + in []*rawQueryMapOutput + exp []*rawQueryMapOutput + }{ + { + name: "empty input", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{}, + exp: []*rawQueryMapOutput{}, + }, + + { + name: "single row returns 0.0", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 0.0, + }, + }, + }, + { + name: "basic derivative", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 3.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 5.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 9.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 3.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + { + name: "12h interval", + fn: "derivative", + interval: "12h", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 3.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 0.5, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.5, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 0.5, + }, + }, + }, + { + name: "negative derivatives", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + // should go negative + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: -2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + { + name: "negative derivatives", + fn: "non_negative_derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + // should go negative + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + } + + for _, test := range tests { + m := derivativeJob(t, test.fn, test.interval) + got := m.processRawQueryDerivative(nil, test.in) + + if len(got) != len(test.exp) { + t.Fatalf("processRawQueryDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp)) + } + + for i := 0; i < len(test.exp); i++ { + if test.exp[i].Time != got[i].Time || math.Abs((test.exp[i].Values.(float64)-got[i].Values.(float64))) > 0.0000001 { + t.Fatalf("processRawQueryDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp) + } + } + } +} diff --git a/influxql/functions.go b/influxql/functions.go index 7bf4396fe1..48f932b916 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -12,6 +12,7 @@ import ( "math" "math/rand" "sort" + "strings" ) // Iterator represents a forward-only iterator over a set of points. @@ -41,16 +42,25 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { // Ensure that there is either a single argument or if for percentile, two if c.Name == "percentile" { if len(c.Args) != 2 { - return nil, fmt.Errorf("expected two arguments for percentile()") + return nil, fmt.Errorf("expected two arguments for %s()", c.Name) + } + } else if strings.HasSuffix(c.Name, "derivative") { + // derivatives require a field name and optional duration + if len(c.Args) == 0 { + return nil, fmt.Errorf("expected field name argument for %s()", c.Name) } } else if len(c.Args) != 1 { return nil, fmt.Errorf("expected one argument for %s()", c.Name) } - // Ensure the argument is a variable reference. - _, ok := c.Args[0].(*VarRef) - if !ok { - return nil, fmt.Errorf("expected field argument in %s()", c.Name) + // derivative can take a nested aggregate function, everything else expects + // a variable reference as the first arg + if !strings.HasSuffix(c.Name, "derivative") { + // Ensure the argument is a variable reference. + _, ok := c.Args[0].(*VarRef) + if !ok { + return nil, fmt.Errorf("expected field argument in %s()", c.Name) + } } // Retrieve map function by name. @@ -81,6 +91,13 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return MapEcho, nil + case "derivative", "non_negative_derivative": + // If the arg is another aggregate e.g. derivative(mean(value)), then + // use the map func for that nested aggregate + if fn, ok := c.Args[0].(*Call); ok { + return InitializeMapFunc(fn) + } + return MapRawQuery, nil default: return nil, fmt.Errorf("function not found: %q", c.Name) } @@ -120,6 +137,13 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return ReducePercentile(lit.Val), nil + case "derivative", "non_negative_derivative": + // If the arg is another aggregate e.g. derivative(mean(value)), then + // use the map func for that nested aggregate + if fn, ok := c.Args[0].(*Call); ok { + return InitializeReduceFunc(fn) + } + return nil, fmt.Errorf("expected function argument to %s", c.Name) default: return nil, fmt.Errorf("function not found: %q", c.Name) } @@ -774,6 +798,10 @@ type rawQueryMapOutput struct { Values interface{} } +func (r *rawQueryMapOutput) String() string { + return fmt.Sprintf("{%#v %#v}", r.Time, r.Values) +} + type rawOutputs []*rawQueryMapOutput func (a rawOutputs) Len() int { return len(a) } diff --git a/influxql/functions_test.go b/influxql/functions_test.go index 9f75dda9aa..e9ed06d30d 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -1,6 +1,10 @@ package influxql -import "testing" +import ( + "testing" + "time" +) + import "sort" type point struct { @@ -99,6 +103,50 @@ func TestInitializeMapFuncPercentile(t *testing.T) { } } +func TestInitializeMapFuncDerivative(t *testing.T) { + + for _, fn := range []string{"derivative", "non_negative_derivative"} { + // No args should fail + c := &Call{ + Name: fn, + Args: []Expr{}, + } + + _, err := InitializeMapFunc(c) + if err == nil { + t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) + } + + // Single field arg should return MapEcho + c = &Call{ + Name: fn, + Args: []Expr{ + &VarRef{Val: " field1"}, + &DurationLiteral{Val: time.Hour}, + }, + } + + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } + + // Nested Aggregate func should return the map func for the nested aggregate + c = &Call{ + Name: fn, + Args: []Expr{ + &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, + &DurationLiteral{Val: time.Hour}, + }, + } + + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } + } +} + func TestInitializeReduceFuncPercentile(t *testing.T) { // No args c := &Call{ diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 954912f7b1..62add5f989 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -106,6 +106,18 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SELECT statement + { + s: `SELECT derivative(field1, 1h) FROM myseries;`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "derivative", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.DurationLiteral{Val: time.Hour}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + }, + }, + // SELECT statement (lowercase) { s: `select my_field from myseries`, @@ -1001,6 +1013,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT field1 FROM 12`, err: `found 12, expected identifier at line 1, char 20`}, {s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`}, {s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`}, + {s: `SELECT derivative(field1), field1 FROM myseries`, err: `derivative cannot be used with other fields`}, {s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`}, {s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`}, {s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, diff --git a/tx.go b/tx.go index ee5f61b025..65b08dbcfb 100644 --- a/tx.go +++ b/tx.go @@ -93,13 +93,20 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri } } - // If a numerical aggregate is requested, ensure it is only performed on numeric data. + // If a numerical aggregate is requested, ensure it is only performed on numeric data or on a + // nested aggregate on numeric data. for _, a := range stmt.FunctionCalls() { - lit, ok := a.Args[0].(*influxql.VarRef) + // Check for fields like `derivative(mean(value), 1d)` + var nested *influxql.Call = a + if fn, ok := nested.Args[0].(*influxql.Call); ok { + nested = fn + } + + lit, ok := nested.Args[0].(*influxql.VarRef) if !ok { return nil, fmt.Errorf("aggregate call didn't contain a field %s", a.String()) } - if influxql.IsNumeric(a) { + if influxql.IsNumeric(nested) { f := m.FieldByName(lit.Val) if f.Type != influxql.Float && f.Type != influxql.Integer { return nil, fmt.Errorf("aggregate '%s' requires numerical field values. Field '%s' is of type %s", @@ -348,7 +355,16 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) l.limit = math.MaxUint64 } } else { - lit, _ := c.Args[0].(*influxql.VarRef) + // Check for calls like `derivative(mean(value), 1d)` + var nested *influxql.Call = c + if fn, ok := c.Args[0].(*influxql.Call); ok { + nested = fn + } + + lit, ok := nested.Args[0].(*influxql.VarRef) + if !ok { + return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) + } fieldName = lit.Val }