diff --git a/engine/aggregator_engine.go b/engine/aggregator_engine.go index a181eeeff2..6b604721dc 100644 --- a/engine/aggregator_engine.go +++ b/engine/aggregator_engine.go @@ -205,7 +205,9 @@ func (self *AggregatorEngine) aggregateValuesForSeries(series *protocol.Series) // update the state of the given group node := seriesState.trie.GetNode(group) var err error + log4go.Trace("Aggregating for group %v", group) for idx, aggregator := range self.aggregators { + log4go.Trace("Aggregating value for %T for group %v and state %v", aggregator, group, node.states[idx]) node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point) if err != nil { return false, err diff --git a/engine/aggregator_operators.go b/engine/aggregator_operators.go index 9110992dc1..b32c2735bb 100644 --- a/engine/aggregator_operators.go +++ b/engine/aggregator_operators.go @@ -363,6 +363,8 @@ type DerivativeAggregatorState struct { type DerivativeAggregator struct { AbstractAggregator + duration *time.Duration // if it's group by time() + lastState *DerivativeAggregatorState defaultValue *protocol.FieldValue alias string } @@ -393,12 +395,27 @@ func (self *DerivativeAggregator) AggregatePoint(state interface{}, p *protocol. s = &DerivativeAggregatorState{} } + // starting a new bucket? (only for group by time()) + if s != self.lastState && self.duration != nil { + // if there was a previous bucket, update its lastValue + if self.lastState != nil { + self.lastState.lastValue = newValue + } + // save the current state as the last + self.lastState = s + } + if s.firstValue == nil { s.firstValue = newValue return s, nil } - s.lastValue = newValue + if self.duration == nil { + s.lastValue = newValue + } else { + s.lastValue = s.firstValue + } + return s, nil } @@ -444,13 +461,20 @@ func NewDerivativeAggregator(q *parser.SelectQuery, v *parser.Value, defaultValu return nil, err } - return &DerivativeAggregator{ + da := &DerivativeAggregator{ AbstractAggregator: AbstractAggregator{ value: v.Elems[0], }, defaultValue: wrappedDefaultValue, alias: v.Alias, - }, nil + } + + da.duration, _, err = q.GetGroupByClause().GetGroupByTime() + if err != nil { + return nil, err + } + + return da, nil } // diff --git a/integration/data_test.go b/integration/data_test.go index 66596bd03d..8c7a32a346 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -2309,10 +2309,10 @@ var ( [ { "points": [ + [310000, 400.0], [300000, 30.0], [120000, 20.0], - [60000, 10.0], - [310000, 15.0] + [60000, 5.0] ], "name": "data", "columns": ["time", "value"] @@ -2340,7 +2340,7 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg // write test data to the database self.client.WriteJsonData(tstData, c, influxdb.Millisecond) // build the test query string - query := fmtFillQuery(aggregate, aggArgs, "data", fill) + query := fmtQuery(aggregate, aggArgs, "data", fill) // run the query series := self.client.RunQuery(query, c) // check that we only got one result series @@ -2357,12 +2357,17 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg } } -func fmtFillQuery(aggregate string, aggArgs []interface{}, series, fill string) string { +func fmtQuery(aggregate string, aggArgs []interface{}, series, fill string) string { args := "value" for _, arg := range aggArgs { args = fmt.Sprintf("%s, %v", args, arg) } - return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill) + + if fill != "" { + return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill) + } + + return fmt.Sprintf("select %s(%s) from %s group by time(60s) where time > 60s and time < 320s", aggregate, args, series) } var emptyAggArgs []interface{} @@ -2373,6 +2378,53 @@ type tv struct { Value interface{} } +// Test Derivative with consecutive buckets and filling later buckets +func (self *DataTestSuite) TestIssue334DerivativeConsecutiveBucketsFillLater(c *C) { + data := ` +[ + { + "name": "data", + "columns": ["time", "value"], + "points": [ + [130000, 80.0], + [120000, 40.0], + [70000, 20.0], + [60000, 10.0] + ] + } +]` + // the 120000 bucket includes the points 80.0, 40.0 and the new value from the next bucket 20.0 + // the 60000 bucket includes two points only 20.0 and 10.0 + expect := []tv{{300000.0, nil}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}} + self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c) +} + +// Test Derivative with non-consecutive buckets and filling in between +func (self *DataTestSuite) TestIssue334DerivativeNonConsecutiveBucketsFillBetween(c *C) { + data := ` +[ + { + "name": "data", + "columns": ["time", "value"], + "points": [ + [250000, 320.0], + [240000, 90.0], + [130000, 80.0], + [120000, 40.0], + [70000, 20.0], + [60000, 10.0] + ] + } +]` + // The 300000 bucket includes no points + // the 240000 bucket includes 320.0, 90.0 and 80.0 from the following bucket (240.0 / 120 = 2) + // the 180000 bucket includes no points + // the 120000 bucket includes 80.0, 40.0 and 20.0 from the next bucket (60.0 / 60 = 1) + // the 60000 bucket includes 20.0 and 10.0 (10 / 10 = 1) + expect := []tv{{300000.0, nil}, {240000.0, 2.0}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}} + self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c) +} + // count aggregate filling with null func (self *DataTestSuite) TestCountAggregateFillWithNull(c *C) { expVals := []tv{{300000.0, 1.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, 1.0}} @@ -2537,32 +2589,32 @@ func (self *DataTestSuite) TestBottom10AggregateFillWith0(c *C) { // derivative aggregate filling with null func (self *DataTestSuite) TestDerivativeAggregateFillWithNull(c *C) { - expVals := []tv{{300000.0, -1.5}, {240000.0, nil}, {180000.0, nil}} + expVals := []tv{{300000.0, 2.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 0.25}} self.tstAggregateFill(aggTstData2, "derivative", "null", emptyAggArgs, expVals, c) } // derivative aggregate filling with 0 func (self *DataTestSuite) TestDerivativeAggregateFillWith0(c *C) { - expVals := []tv{{300000.0, -1.5}, {240000.0, 0.0}, {180000.0, 0.0}} + expVals := []tv{{300000.0, 2.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.25}} self.tstAggregateFill(aggTstData2, "derivative", "0", emptyAggArgs, expVals, c) } // difference aggregate filling with null func (self *DataTestSuite) TestDifferenceAggregateFillWithNull(c *C) { - expVals := []tv{{300000.0, 15.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}} + expVals := []tv{{300000.0, -370.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}} self.tstAggregateFill(aggTstData2, "difference", "null", emptyAggArgs, expVals, c) } // difference aggregate filling with 0 func (self *DataTestSuite) TestDifferenceAggregateFillWith0(c *C) { - expVals := []tv{{300000.0, 15.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}} + expVals := []tv{{300000.0, -370.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}} self.tstAggregateFill(aggTstData2, "difference", "0", emptyAggArgs, expVals, c) } // histogram aggregate filling with null func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) { self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond) - series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "null"), c) + series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "null"), c) c.Assert(len(series), Equals, 1) maps := ToMap(series[0]) c.Assert(len(maps), Equals, 6) @@ -2574,7 +2626,7 @@ func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) { // histogram aggregate filling with 0 func (self *DataTestSuite) TestHistogramAggregateFillWith0(c *C) { self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond) - series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "0"), c) + series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "0"), c) c.Assert(len(series), Equals, 1) maps := ToMap(series[0]) c.Assert(len(maps), Equals, 6)