From 9c0584325f6ad007877826e15baeef2d687b6e75 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 May 2015 10:37:58 -0600 Subject: [PATCH 1/9] Add support for parsing derivative w/ nested functions --- influxql/functions.go | 34 ++++++++++++++++++++++++++++---- influxql/functions_test.go | 40 ++++++++++++++++++++++++++++++++++++++ tx.go | 10 +++++++++- 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/influxql/functions.go b/influxql/functions.go index 7bf4396fe1..ab422c98b5 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -47,10 +47,14 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return nil, fmt.Errorf("expected one argument for %s()", c.Name) } - // Ensure the argument is a variable reference. - _, ok := c.Args[0].(*VarRef) - if !ok { - return nil, fmt.Errorf("expected field argument in %s()", c.Name) + // derivative can take a nested aggregate function, everything else expects + // a variable reference as the first arg + if c.Name != "derivative" { + // Ensure the argument is a variable reference. + _, ok := c.Args[0].(*VarRef) + if !ok { + return nil, fmt.Errorf("expected field argument in %s()", c.Name) + } } // Retrieve map function by name. @@ -81,6 +85,17 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return MapEcho, nil + case "derivative": + if len(c.Args) == 0 { + return nil, fmt.Errorf("expected argument in derivative()") + } + + // If the arg is another aggregate e.g. derivative(mean(value)), then + // use the map func for that nested aggregate + if fn, ok := c.Args[0].(*Call); ok { + return InitializeMapFunc(fn) + } + return MapRawQuery, nil default: return nil, fmt.Errorf("function not found: %q", c.Name) } @@ -120,6 +135,17 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return ReducePercentile(lit.Val), nil + case "derivative": + if len(c.Args) == 0 { + return nil, fmt.Errorf("expected argument in derivative()") + } + + // If the arg is another aggregate e.g. derivative(mean(value)), then + // use the map func for that nested aggregate + if fn, ok := c.Args[0].(*Call); ok { + return InitializeReduceFunc(fn) + } + return nil, fmt.Errorf("expected function argument to derivative: %q", c.Name) default: return nil, fmt.Errorf("function not found: %q", c.Name) } diff --git a/influxql/functions_test.go b/influxql/functions_test.go index 9f75dda9aa..30aa21b528 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -1,6 +1,7 @@ package influxql import "testing" + import "sort" type point struct { @@ -99,6 +100,45 @@ func TestInitializeMapFuncPercentile(t *testing.T) { } } +func TestInitializeMapFuncDerivative(t *testing.T) { + // No args should fail + c := &Call{ + Name: "derivative", + Args: []Expr{}, + } + + _, err := InitializeMapFunc(c) + if err == nil { + t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) + } + + // Single field arg should return MapEcho + c = &Call{ + Name: "derivative", + Args: []Expr{ + &VarRef{Val: " field1"}, + }, + } + + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } + + // Nested Aggregate func should return the map func for the nested aggregate + c = &Call{ + Name: "derivative", + Args: []Expr{ + &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, + }, + } + + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } +} + func TestInitializeReduceFuncPercentile(t *testing.T) { // No args c := &Call{ diff --git a/tx.go b/tx.go index ee5f61b025..c087558e1c 100644 --- a/tx.go +++ b/tx.go @@ -348,7 +348,15 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) l.limit = math.MaxUint64 } } else { - lit, _ := c.Args[0].(*influxql.VarRef) + var nested *influxql.Call = c + if fn, ok := c.Args[0].(*influxql.Call); ok { + nested = fn + } + + lit, ok := nested.Args[0].(*influxql.VarRef) + if !ok { + return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) + } fieldName = lit.Val } From eb1d7a659f88a8716b441e215cab7f44deb1b1cf Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 May 2015 13:03:34 -0600 Subject: [PATCH 2/9] Prohibit multiple columns in select when using derivative May be supported in the future but workaround is to run separate queries. --- influxql/ast.go | 15 +++++++++++++++ influxql/parser_test.go | 1 + 2 files changed, 16 insertions(+) diff --git a/influxql/ast.go b/influxql/ast.go index 316562ae9f..72e1d980d4 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -642,6 +642,15 @@ type SelectStatement struct { FillValue interface{} } +func (s *SelectStatement) IsNonNestedDerivative() bool { + for _, f := range s.Fields { + if f.Name() == "derivative" { + return true + } + } + return false +} + // Clone returns a deep copy of the statement. func (s *SelectStatement) Clone() *SelectStatement { clone := &SelectStatement{ @@ -876,6 +885,12 @@ func (s *SelectStatement) Validate(tr targetRequirement) error { } } + // If a derivative is requested, it must be the only field in the query. We don't support + // multiple fields in combination w/ derivaties yet. + if s.IsNonNestedDerivative() && len(s.Fields) != 1 { + return fmt.Errorf("derivative cannot be used with other fields") + } + return nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 954912f7b1..0bd93fd060 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1001,6 +1001,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT field1 FROM 12`, err: `found 12, expected identifier at line 1, char 20`}, {s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`}, {s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`}, + {s: `SELECT derivative(field1), field1 FROM myseries`, err: `derivative cannot be used with other fields`}, {s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`}, {s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`}, {s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, From 7fd9a0acd343c82cf883c8b647c506de5ada23cc Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 May 2015 14:34:12 -0600 Subject: [PATCH 3/9] Add validation for derivative arguments Derivative must be of the form derviative(field, duration) or derivative(agg(field), duration). --- influxql/ast.go | 38 +++++++++++++++++++++++++++++++++++++- influxql/functions.go | 12 ++---------- influxql/functions_test.go | 7 ++++++- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 72e1d980d4..337ebbc444 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -885,12 +885,48 @@ func (s *SelectStatement) Validate(tr targetRequirement) error { } } + if err := s.validateDerivative(); err != nil { + return err + } + + return nil +} + +func (s *SelectStatement) validateDerivative() error { + if !s.IsNonNestedDerivative() { + return nil + } + // If a derivative is requested, it must be the only field in the query. We don't support // multiple fields in combination w/ derivaties yet. - if s.IsNonNestedDerivative() && len(s.Fields) != 1 { + if len(s.Fields) != 1 { return fmt.Errorf("derivative cannot be used with other fields") } + aggr := s.FunctionCalls() + if len(aggr) != 1 { + return fmt.Errorf("derivative cannot be used with other fields") + } + + // Derivative requires two arguments + derivativeCall := aggr[0] + if len(derivativeCall.Args) != 2 { + return fmt.Errorf("derivative requires a field and duration argument") + } + + // First arg must be a field or aggr over a field e.g. (mean(field)) + _, callOk := derivativeCall.Args[0].(*Call) + _, varOk := derivativeCall.Args[0].(*VarRef) + + if !(callOk || varOk) { + return fmt.Errorf("derivative requires a field argument") + } + + // Second must be a duration .e.g (1h) + if _, ok := derivativeCall.Args[1].(*DurationLiteral); !ok { + return fmt.Errorf("derivative requires a duration argument") + } + return nil } diff --git a/influxql/functions.go b/influxql/functions.go index ab422c98b5..604ebb280b 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -39,9 +39,9 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { } // Ensure that there is either a single argument or if for percentile, two - if c.Name == "percentile" { + if c.Name == "percentile" || c.Name == "derivative" { if len(c.Args) != 2 { - return nil, fmt.Errorf("expected two arguments for percentile()") + return nil, fmt.Errorf("expected two arguments for %s()", c.Name) } } else if len(c.Args) != 1 { return nil, fmt.Errorf("expected one argument for %s()", c.Name) @@ -86,10 +86,6 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { } return MapEcho, nil case "derivative": - if len(c.Args) == 0 { - return nil, fmt.Errorf("expected argument in derivative()") - } - // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*Call); ok { @@ -136,10 +132,6 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { } return ReducePercentile(lit.Val), nil case "derivative": - if len(c.Args) == 0 { - return nil, fmt.Errorf("expected argument in derivative()") - } - // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*Call); ok { diff --git a/influxql/functions_test.go b/influxql/functions_test.go index 30aa21b528..bd6a32fd31 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -1,6 +1,9 @@ package influxql -import "testing" +import ( + "testing" + "time" +) import "sort" @@ -117,6 +120,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { Name: "derivative", Args: []Expr{ &VarRef{Val: " field1"}, + &DurationLiteral{Val: time.Hour}, }, } @@ -130,6 +134,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { Name: "derivative", Args: []Expr{ &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, + &DurationLiteral{Val: time.Hour}, }, } From a0a4600e7f5eabe1584e5dff5d09833b7acb40b5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 12 May 2015 16:51:19 -0600 Subject: [PATCH 4/9] Add derivative function Calculates the derivative of consequtive points and normalizes the value to a given interval. It supports simple derivates over fields as well as nested derivatives over another aggregate function. Fixes #1822 --- influxql/ast.go | 25 ++++++++- influxql/engine.go | 114 +++++++++++++++++++++++++++++++++++++++- influxql/parser_test.go | 12 +++++ tx.go | 14 +++-- 4 files changed, 158 insertions(+), 7 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 337ebbc444..ff2f78750f 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -642,7 +642,9 @@ type SelectStatement struct { FillValue interface{} } -func (s *SelectStatement) IsNonNestedDerivative() bool { +// HasDerivative returns true if one of the field in the statement is a +// derivative aggregate +func (s *SelectStatement) HasDerivative() bool { for _, f := range s.Fields { if f.Name() == "derivative" { return true @@ -651,6 +653,25 @@ func (s *SelectStatement) IsNonNestedDerivative() bool { return false } +// IsSimpleDerivative return true if a field is a derivative function with a +// variable ref as the first arg +func (s *SelectStatement) IsSimpleDerivative() bool { + for _, f := range s.Fields { + if f.Name() == "derivative" { + // cast to derivative call + if d, ok := f.Expr.(*Call); ok { + + // it's nested if the first argument is an aggregate function + if _, ok := d.Args[0].(*VarRef); ok { + return true + } + } + return false + } + } + return false +} + // Clone returns a deep copy of the statement. func (s *SelectStatement) Clone() *SelectStatement { clone := &SelectStatement{ @@ -893,7 +914,7 @@ func (s *SelectStatement) Validate(tr targetRequirement) error { } func (s *SelectStatement) validateDerivative() error { - if !s.IsNonNestedDerivative() { + if !s.HasDerivative() { return nil } diff --git a/influxql/engine.go b/influxql/engine.go index 0764cd6d47..caf18ee282 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -76,8 +76,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { } defer m.Close() - // if it's a raw query we handle processing differently - if m.stmt.IsRawQuery { + // if it's a raw query or a non-nested derivative we handle processing differently + if m.stmt.IsRawQuery || m.stmt.IsSimpleDerivative() { m.processRawQuery(out, filterEmptyResults) return } @@ -196,6 +196,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { // handle any fill options resultValues = m.processFill(resultValues) + // process derivatives + resultValues = m.processDerivative(resultValues) + row := &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, @@ -228,6 +231,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { valuesOffset := 0 valuesToReturn := make([]*rawQueryMapOutput, 0) + var lastValueFromPreviousChunk *rawQueryMapOutput // loop until we've emptied out all the mappers and sent everything out for { // collect up to the limit for each mapper @@ -324,6 +328,10 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // hit the chunk size? Send out what has been accumulated, but keep // processing. if len(valuesToReturn) >= m.chunkSize { + lastValueFromPreviousChunk = valuesToReturn[len(valuesToReturn)-1] + + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -342,6 +350,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { out <- m.processRawResults(nil) } } else { + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -349,6 +359,106 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { } } +// derivativeInterval returns the time interval for the one (and only) derivative func +func (m *MapReduceJob) derivativeInterval() time.Duration { + return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val +} + +func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput { + // If we're called and do not have a derivative aggregate function, then return what was passed in + if !m.stmt.HasDerivative() { + return valuesToReturn + } + + if len(valuesToReturn) == 0 { + return valuesToReturn + } + + // If we only have 1 value, then the value did not change, so return + // a single row w/ 0.0 + if len(valuesToReturn) == 1 { + return []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: valuesToReturn[0].Time, + Values: 0.0, + }, + } + } + + if lastValueFromPreviousChunk == nil { + lastValueFromPreviousChunk = valuesToReturn[0] + } + + // The duration to normalize the derivative by. This is so the derivative values + // can be expressed as "per second", etc.. within each time segment + interval := m.derivativeInterval() + + derivativeValues := make([]*rawQueryMapOutput, len(valuesToReturn)-1) + for i := 1; i < len(valuesToReturn); i++ { + v := valuesToReturn[i] + + // Calculate the derivate of successive points by dividing the difference + // of each value by the elapsed time normalized to the interval + diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64) + elapsed := v.Time - lastValueFromPreviousChunk.Time + + derivativeValues[i-1] = &rawQueryMapOutput{ + Time: v.Time, + Values: diff / (float64(elapsed) / float64(interval)), + } + lastValueFromPreviousChunk = v + } + + return derivativeValues +} + +// processDerivative returns the derivatives of the results +func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} { + + // Return early if we're not supposed to process the derivatives + if !m.stmt.HasDerivative() { + return results + } + + // Return early if we can't calculate derivatives + if len(results) == 0 { + return results + } + + // If we only have 1 value, then the value did not change, so return + // a single row w/ 0.0 + if len(results) == 1 { + return [][]interface{}{ + []interface{}{results[0][0], 0.0}, + } + } + + // Otherwise calculate the derivatives as the difference between consequtive + // points divided by the elapsed time. Then normalize to the requested + // interval. + derivatives := make([][]interface{}, len(results)-1) + for i := 1; i < len(results); i++ { + prev := results[i-1] + cur := results[i] + + if cur[1] == nil || prev[1] == nil { + derivatives[i-1] = cur + continue + } + + elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time)) + diff := cur[1].(float64) - prev[1].(float64) + + val := []interface{}{ + cur[0], + float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())), + } + derivatives[i-1] = val + } + + return derivatives +} + // processsResults will apply any math that was specified in the select statement against the passed in results func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} { hasMath := false diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 0bd93fd060..62add5f989 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -106,6 +106,18 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SELECT statement + { + s: `SELECT derivative(field1, 1h) FROM myseries;`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "derivative", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.DurationLiteral{Val: time.Hour}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + }, + }, + // SELECT statement (lowercase) { s: `select my_field from myseries`, diff --git a/tx.go b/tx.go index c087558e1c..65b08dbcfb 100644 --- a/tx.go +++ b/tx.go @@ -93,13 +93,20 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri } } - // If a numerical aggregate is requested, ensure it is only performed on numeric data. + // If a numerical aggregate is requested, ensure it is only performed on numeric data or on a + // nested aggregate on numeric data. for _, a := range stmt.FunctionCalls() { - lit, ok := a.Args[0].(*influxql.VarRef) + // Check for fields like `derivative(mean(value), 1d)` + var nested *influxql.Call = a + if fn, ok := nested.Args[0].(*influxql.Call); ok { + nested = fn + } + + lit, ok := nested.Args[0].(*influxql.VarRef) if !ok { return nil, fmt.Errorf("aggregate call didn't contain a field %s", a.String()) } - if influxql.IsNumeric(a) { + if influxql.IsNumeric(nested) { f := m.FieldByName(lit.Val) if f.Type != influxql.Float && f.Type != influxql.Integer { return nil, fmt.Errorf("aggregate '%s' requires numerical field values. Field '%s' is of type %s", @@ -348,6 +355,7 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) l.limit = math.MaxUint64 } } else { + // Check for calls like `derivative(mean(value), 1d)` var nested *influxql.Call = c if fn, ok := c.Args[0].(*influxql.Call); ok { nested = fn From 98521b273e73447e71557946b8a19cba384d88df Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 13 May 2015 14:03:58 -0600 Subject: [PATCH 5/9] Add non_negative_derivative Fixes #1477 --- influxql/ast.go | 4 +- influxql/engine.go | 46 +++-- influxql/engine_test.go | 383 +++++++++++++++++++++++++++++++++++++ influxql/functions.go | 15 +- influxql/functions_test.go | 67 +++---- 5 files changed, 463 insertions(+), 52 deletions(-) create mode 100644 influxql/engine_test.go diff --git a/influxql/ast.go b/influxql/ast.go index ff2f78750f..773d574a73 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -646,7 +646,7 @@ type SelectStatement struct { // derivative aggregate func (s *SelectStatement) HasDerivative() bool { for _, f := range s.Fields { - if f.Name() == "derivative" { + if strings.HasSuffix(f.Name(), "derivative") { return true } } @@ -657,7 +657,7 @@ func (s *SelectStatement) HasDerivative() bool { // variable ref as the first arg func (s *SelectStatement) IsSimpleDerivative() bool { for _, f := range s.Fields { - if f.Name() == "derivative" { + if strings.HasSuffix(f.Name(), "derivative") { // cast to derivative call if d, ok := f.Expr.(*Call); ok { diff --git a/influxql/engine.go b/influxql/engine.go index caf18ee282..2deb9e3ca5 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -364,6 +364,10 @@ func (m *MapReduceJob) derivativeInterval() time.Duration { return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val } +func (m *MapReduceJob) isNonNegativeDerivative() bool { + return m.stmt.FunctionCalls()[0].Name == "non_negative_derivative" +} + func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput { // If we're called and do not have a derivative aggregate function, then return what was passed in if !m.stmt.HasDerivative() { @@ -389,24 +393,31 @@ func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *raw lastValueFromPreviousChunk = valuesToReturn[0] } - // The duration to normalize the derivative by. This is so the derivative values - // can be expressed as "per second", etc.. within each time segment - interval := m.derivativeInterval() + // Determines whether to drop negative differences + isNonNegative := m.isNonNegativeDerivative() - derivativeValues := make([]*rawQueryMapOutput, len(valuesToReturn)-1) + derivativeValues := []*rawQueryMapOutput{} for i := 1; i < len(valuesToReturn); i++ { v := valuesToReturn[i] - // Calculate the derivate of successive points by dividing the difference + // Calculate the derivative of successive points by dividing the difference // of each value by the elapsed time normalized to the interval + var value interface{} diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64) elapsed := v.Time - lastValueFromPreviousChunk.Time + value = diff / (float64(elapsed) / float64(m.derivativeInterval())) - derivativeValues[i-1] = &rawQueryMapOutput{ - Time: v.Time, - Values: diff / (float64(elapsed) / float64(interval)), - } lastValueFromPreviousChunk = v + + // Drop negative values for non-negative derivatives + if isNonNegative && diff < 0 { + continue + } + + derivativeValues = append(derivativeValues, &rawQueryMapOutput{ + Time: v.Time, + Values: value, + }) } return derivativeValues @@ -433,27 +444,36 @@ func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{ } } + // Determines whether to drop negative differences + isNonNegative := m.isNonNegativeDerivative() + // Otherwise calculate the derivatives as the difference between consequtive // points divided by the elapsed time. Then normalize to the requested // interval. - derivatives := make([][]interface{}, len(results)-1) + derivatives := [][]interface{}{} for i := 1; i < len(results); i++ { prev := results[i-1] cur := results[i] if cur[1] == nil || prev[1] == nil { - derivatives[i-1] = cur continue } + var value interface{} elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time)) diff := cur[1].(float64) - prev[1].(float64) + value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())) + + // Drop negative values for non-negative derivatives + if isNonNegative && diff < 0 { + continue + } val := []interface{}{ cur[0], - float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())), + value, } - derivatives[i-1] = val + derivatives = append(derivatives, val) } return derivatives diff --git a/influxql/engine_test.go b/influxql/engine_test.go new file mode 100644 index 0000000000..7ee48ba0ee --- /dev/null +++ b/influxql/engine_test.go @@ -0,0 +1,383 @@ +package influxql + +import ( + "fmt" + "testing" + "time" +) + +func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob { + q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value), %s) FROM foo", fn, interval)) + if err != nil { + t.Fatalf("failed to parse query: %s", err) + } + m := &MapReduceJob{ + stmt: q.Statements[0].(*SelectStatement), + } + + return m +} + +func TestProcessDerivative(t *testing.T) { + tests := []struct { + name string + fn string + interval string + in [][]interface{} + exp [][]interface{} + }{ + { + name: "empty input", + fn: "derivative", + interval: "1d", + in: [][]interface{}{}, + exp: [][]interface{}{}, + }, + + { + name: "single row returns 0.0", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 0.0, + }, + }, + }, + { + name: "basic derivative", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 5.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 9.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + { + name: "12h interval", + fn: "derivative", + interval: "12h", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 0.5, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.5, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 0.5, + }, + }, + }, + { + name: "negative derivatives", + fn: "derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), -2.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + { + name: "negative derivatives", + fn: "non_negative_derivative", + interval: "1d", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0, + }, + // Show resultes in negative derivative + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 0.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0, + }, + }, + }, + } + + for _, test := range tests { + m := derivativeJob(t, test.fn, test.interval) + got := m.processDerivative(test.in) + + if len(got) != len(test.exp) { + t.Fatalf("processDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp)) + } + + for i := 0; i < len(test.exp); i++ { + if test.exp[i][0] != got[i][0] || test.exp[i][1] != got[i][1] { + t.Fatalf("processDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp) + } + } + } +} + +func TestProcessRawQueryDerivative(t *testing.T) { + tests := []struct { + name string + fn string + interval string + in []*rawQueryMapOutput + exp []*rawQueryMapOutput + }{ + { + name: "empty input", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{}, + exp: []*rawQueryMapOutput{}, + }, + + { + name: "single row returns 0.0", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 0.0, + }, + }, + }, + { + name: "basic derivative", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 3.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 5.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 9.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + { + name: "12h interval", + fn: "derivative", + interval: "12h", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 3.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 0.5, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.5, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 0.5, + }, + }, + }, + { + name: "negative derivatives", + fn: "derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + // should go negative + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: -2.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + { + name: "negative derivatives", + fn: "non_negative_derivative", + interval: "1d", + in: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Unix(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 2.0, + }, + // should go negative + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), + Values: 0.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + exp: []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), + Values: 1.0, + }, + &rawQueryMapOutput{ + Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(), + Values: 4.0, + }, + }, + }, + } + + for _, test := range tests { + m := derivativeJob(t, test.fn, test.interval) + got := m.processRawQueryDerivative(nil, test.in) + + if len(got) != len(test.exp) { + t.Fatalf("processRawQueryDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp)) + } + + for i := 0; i < len(test.exp); i++ { + if test.exp[i].Time != got[i].Time || (test.exp[i].Values.(float64)-got[i].Values.(float64)) > 0.000000001 { + t.Fatalf("processRawQueryDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp) + } + } + } +} diff --git a/influxql/functions.go b/influxql/functions.go index 604ebb280b..4f64aa3739 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -12,6 +12,7 @@ import ( "math" "math/rand" "sort" + "strings" ) // Iterator represents a forward-only iterator over a set of points. @@ -39,7 +40,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { } // Ensure that there is either a single argument or if for percentile, two - if c.Name == "percentile" || c.Name == "derivative" { + if c.Name == "percentile" || strings.HasSuffix(c.Name, "derivative") { if len(c.Args) != 2 { return nil, fmt.Errorf("expected two arguments for %s()", c.Name) } @@ -49,7 +50,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { // derivative can take a nested aggregate function, everything else expects // a variable reference as the first arg - if c.Name != "derivative" { + if !strings.HasSuffix(c.Name, "derivative") { // Ensure the argument is a variable reference. _, ok := c.Args[0].(*VarRef) if !ok { @@ -85,7 +86,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return MapEcho, nil - case "derivative": + case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*Call); ok { @@ -131,13 +132,13 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } return ReducePercentile(lit.Val), nil - case "derivative": + case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*Call); ok { return InitializeReduceFunc(fn) } - return nil, fmt.Errorf("expected function argument to derivative: %q", c.Name) + return nil, fmt.Errorf("expected function argument to %s", c.Name) default: return nil, fmt.Errorf("function not found: %q", c.Name) } @@ -792,6 +793,10 @@ type rawQueryMapOutput struct { Values interface{} } +func (r *rawQueryMapOutput) String() string { + return fmt.Sprintf("{%#v %#v}", r.Time, r.Values) +} + type rawOutputs []*rawQueryMapOutput func (a rawOutputs) Len() int { return len(a) } diff --git a/influxql/functions_test.go b/influxql/functions_test.go index bd6a32fd31..e9ed06d30d 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -104,43 +104,46 @@ func TestInitializeMapFuncPercentile(t *testing.T) { } func TestInitializeMapFuncDerivative(t *testing.T) { - // No args should fail - c := &Call{ - Name: "derivative", - Args: []Expr{}, - } - _, err := InitializeMapFunc(c) - if err == nil { - t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) - } + for _, fn := range []string{"derivative", "non_negative_derivative"} { + // No args should fail + c := &Call{ + Name: fn, + Args: []Expr{}, + } - // Single field arg should return MapEcho - c = &Call{ - Name: "derivative", - Args: []Expr{ - &VarRef{Val: " field1"}, - &DurationLiteral{Val: time.Hour}, - }, - } + _, err := InitializeMapFunc(c) + if err == nil { + t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) + } - _, err = InitializeMapFunc(c) - if err != nil { - t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) - } + // Single field arg should return MapEcho + c = &Call{ + Name: fn, + Args: []Expr{ + &VarRef{Val: " field1"}, + &DurationLiteral{Val: time.Hour}, + }, + } - // Nested Aggregate func should return the map func for the nested aggregate - c = &Call{ - Name: "derivative", - Args: []Expr{ - &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, - &DurationLiteral{Val: time.Hour}, - }, - } + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } - _, err = InitializeMapFunc(c) - if err != nil { - t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + // Nested Aggregate func should return the map func for the nested aggregate + c = &Call{ + Name: fn, + Args: []Expr{ + &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, + &DurationLiteral{Val: time.Hour}, + }, + } + + _, err = InitializeMapFunc(c) + if err != nil { + t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) + } } } From a3edae082b90d7f05bab378b29d81e06af36ad14 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 13 May 2015 16:12:46 -0600 Subject: [PATCH 6/9] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b001e1c0e..ec34f39493 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,16 @@ ## v0.9.0-rc31 [unreleased] +### Features +- [#1822](https://github.com/influxdb/influxdb/issues/1822): Wire up DERIVATIVE aggregate +- [#1477](https://github.com/influxdb/influxdb/issues/1477): Wire up non_negative_derivative function + ### Bugfixes - [#2545](https://github.com/influxdb/influxdb/pull/2545): Use "value" as the field name for graphite input. Thanks @cannium. - [#2558](https://github.com/influxdb/influxdb/pull/2558): Fix client response check - thanks @vladlopes! +## PRs +- [#2569](https://github.com/influxdb/influxdb/pull/2569): Add derivative functions + ## v0.9.0-rc30 [2015-05-12] ### Release Notes From faa099a382ffb6b5130b74bfa7a76efaa9fb28fa Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 May 2015 14:53:17 -0600 Subject: [PATCH 7/9] Fix code review comments --- influxql/ast.go | 23 +++++++++-------------- influxql/engine.go | 5 ++--- influxql/engine_test.go | 9 +++++++-- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 773d574a73..5af25fbc97 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -642,31 +642,26 @@ type SelectStatement struct { FillValue interface{} } -// HasDerivative returns true if one of the field in the statement is a +// HasDerivative returns true if one of the function calls in the statement is a // derivative aggregate func (s *SelectStatement) HasDerivative() bool { - for _, f := range s.Fields { - if strings.HasSuffix(f.Name(), "derivative") { + for _, f := range s.FunctionCalls() { + if strings.HasSuffix(f.Name, "derivative") { return true } } return false } -// IsSimpleDerivative return true if a field is a derivative function with a +// IsSimpleDerivative return true if one of the function call is a derivative function with a // variable ref as the first arg func (s *SelectStatement) IsSimpleDerivative() bool { - for _, f := range s.Fields { - if strings.HasSuffix(f.Name(), "derivative") { - // cast to derivative call - if d, ok := f.Expr.(*Call); ok { - - // it's nested if the first argument is an aggregate function - if _, ok := d.Args[0].(*VarRef); ok { - return true - } + for _, f := range s.FunctionCalls() { + if strings.HasSuffix(f.Name, "derivative") { + // it's nested if the first argument is an aggregate function + if _, ok := f.Args[0].(*VarRef); ok { + return true } - return false } } return false diff --git a/influxql/engine.go b/influxql/engine.go index 2deb9e3ca5..d48c66a787 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -379,7 +379,7 @@ func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *raw } // If we only have 1 value, then the value did not change, so return - // a single row w/ 0.0 + // a single row with 0.0 if len(valuesToReturn) == 1 { return []*rawQueryMapOutput{ &rawQueryMapOutput{ @@ -425,7 +425,6 @@ func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *raw // processDerivative returns the derivatives of the results func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} { - // Return early if we're not supposed to process the derivatives if !m.stmt.HasDerivative() { return results @@ -447,7 +446,7 @@ func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{ // Determines whether to drop negative differences isNonNegative := m.isNonNegativeDerivative() - // Otherwise calculate the derivatives as the difference between consequtive + // Otherwise calculate the derivatives as the difference between consecutive // points divided by the elapsed time. Then normalize to the requested // interval. derivatives := [][]interface{}{} diff --git a/influxql/engine_test.go b/influxql/engine_test.go index 7ee48ba0ee..9a6cf7ed73 100644 --- a/influxql/engine_test.go +++ b/influxql/engine_test.go @@ -2,6 +2,7 @@ package influxql import ( "fmt" + "math" "testing" "time" ) @@ -18,6 +19,8 @@ func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob { return m } +// TestProccessDerivative tests the processDerivative transformation function on the engine. +// The is called for a query with a group by. func TestProcessDerivative(t *testing.T) { tests := []struct { name string @@ -185,6 +188,8 @@ func TestProcessDerivative(t *testing.T) { } } +// TestProcessRawQueryDerivative tests the processRawQueryDerivative transformation function on the engine. +// The is called for a queries that do not have a group by. func TestProcessRawQueryDerivative(t *testing.T) { tests := []struct { name string @@ -243,7 +248,7 @@ func TestProcessRawQueryDerivative(t *testing.T) { exp: []*rawQueryMapOutput{ &rawQueryMapOutput{ Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(), - Values: 2.0, + Values: 3.0, }, &rawQueryMapOutput{ Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(), @@ -375,7 +380,7 @@ func TestProcessRawQueryDerivative(t *testing.T) { } for i := 0; i < len(test.exp); i++ { - if test.exp[i].Time != got[i].Time || (test.exp[i].Values.(float64)-got[i].Values.(float64)) > 0.000000001 { + if test.exp[i].Time != got[i].Time || math.Abs((test.exp[i].Values.(float64)-got[i].Values.(float64))) > 0.0000001 { t.Fatalf("processRawQueryDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp) } } From 34007a8be903f576feaf43ca1ebeb615461292ac Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 May 2015 15:11:00 -0600 Subject: [PATCH 8/9] Make duration argument optional for derivatives If it's not specified, it defaults to 1s for raw queries and to the group by duration on group by queries. --- influxql/ast.go | 13 ++++++++----- influxql/engine.go | 8 +++++++- influxql/engine_test.go | 37 ++++++++++++++++++++++++++++++++++++- influxql/functions.go | 7 ++++++- 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 5af25fbc97..35e3bcc210 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -926,8 +926,8 @@ func (s *SelectStatement) validateDerivative() error { // Derivative requires two arguments derivativeCall := aggr[0] - if len(derivativeCall.Args) != 2 { - return fmt.Errorf("derivative requires a field and duration argument") + if len(derivativeCall.Args) == 0 { + return fmt.Errorf("derivative requires a field argument") } // First arg must be a field or aggr over a field e.g. (mean(field)) @@ -938,9 +938,12 @@ func (s *SelectStatement) validateDerivative() error { return fmt.Errorf("derivative requires a field argument") } - // Second must be a duration .e.g (1h) - if _, ok := derivativeCall.Args[1].(*DurationLiteral); !ok { - return fmt.Errorf("derivative requires a duration argument") + // If a duration arg is pased, make sure it's a duration + if len(derivativeCall.Args) == 2 { + // Second must be a duration .e.g (1h) + if _, ok := derivativeCall.Args[1].(*DurationLiteral); !ok { + return fmt.Errorf("derivative requires a duration argument") + } } return nil diff --git a/influxql/engine.go b/influxql/engine.go index d48c66a787..a6f32447a9 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -361,7 +361,13 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // derivativeInterval returns the time interval for the one (and only) derivative func func (m *MapReduceJob) derivativeInterval() time.Duration { - return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val + if len(m.stmt.FunctionCalls()[0].Args) == 2 { + return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val + } + if m.stmt.groupByInterval > 0 { + return m.stmt.groupByInterval + } + return time.Second } func (m *MapReduceJob) isNonNegativeDerivative() bool { diff --git a/influxql/engine_test.go b/influxql/engine_test.go index 9a6cf7ed73..86aaba69ef 100644 --- a/influxql/engine_test.go +++ b/influxql/engine_test.go @@ -8,7 +8,12 @@ import ( ) func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob { - q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value), %s) FROM foo", fn, interval)) + + if interval != "" { + interval = ", " + interval + } + + q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value)%s) FROM foo", fn, interval)) if err != nil { t.Fatalf("failed to parse query: %s", err) } @@ -52,6 +57,36 @@ func TestProcessDerivative(t *testing.T) { }, }, }, + { + name: "derivative normalized to 1s by default", + fn: "derivative", + interval: "", + in: [][]interface{}{ + []interface{}{ + time.Unix(0, 0), 1.0, + }, + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 3.0, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 5.0, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 9.0, + }, + }, + exp: [][]interface{}{ + []interface{}{ + time.Unix(0, 0).Add(24 * time.Hour), 2.0 / 86400, + }, + []interface{}{ + time.Unix(0, 0).Add(48 * time.Hour), 2.0 / 86400, + }, + []interface{}{ + time.Unix(0, 0).Add(72 * time.Hour), 4.0 / 86400, + }, + }, + }, { name: "basic derivative", fn: "derivative", diff --git a/influxql/functions.go b/influxql/functions.go index 4f64aa3739..48f932b916 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -40,10 +40,15 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { } // Ensure that there is either a single argument or if for percentile, two - if c.Name == "percentile" || strings.HasSuffix(c.Name, "derivative") { + if c.Name == "percentile" { if len(c.Args) != 2 { return nil, fmt.Errorf("expected two arguments for %s()", c.Name) } + } else if strings.HasSuffix(c.Name, "derivative") { + // derivatives require a field name and optional duration + if len(c.Args) == 0 { + return nil, fmt.Errorf("expected field name argument for %s()", c.Name) + } } else if len(c.Args) != 1 { return nil, fmt.Errorf("expected one argument for %s()", c.Name) } From 61712d82fb76feb724f9b304476f620faeebaa90 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 May 2015 15:20:22 -0600 Subject: [PATCH 9/9] Prevent division by 0 for derivative --- influxql/engine.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/influxql/engine.go b/influxql/engine.go index a6f32447a9..69e4e6fe61 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -408,10 +408,13 @@ func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *raw // Calculate the derivative of successive points by dividing the difference // of each value by the elapsed time normalized to the interval - var value interface{} diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64) elapsed := v.Time - lastValueFromPreviousChunk.Time - value = diff / (float64(elapsed) / float64(m.derivativeInterval())) + + value := 0.0 + if elapsed > 0 { + value = diff / (float64(elapsed) / float64(m.derivativeInterval())) + } lastValueFromPreviousChunk = v @@ -464,10 +467,12 @@ func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{ continue } - var value interface{} elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time)) diff := cur[1].(float64) - prev[1].(float64) - value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())) + value := 0.0 + if elapsed > 0 { + value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())) + } // Drop negative values for non-negative derivatives if isNonNegative && diff < 0 {