Fix derivative when there is a group by time() and fill

Fix #334
pull/1031/head
David Norton 2014-10-10 10:28:19 -04:00
parent 65e4136a38
commit 403881854c
3 changed files with 92 additions and 14 deletions

View File

@ -205,7 +205,9 @@ func (self *AggregatorEngine) aggregateValuesForSeries(series *protocol.Series)
// update the state of the given group
node := seriesState.trie.GetNode(group)
var err error
log4go.Trace("Aggregating for group %v", group)
for idx, aggregator := range self.aggregators {
log4go.Trace("Aggregating value for %T for group %v and state %v", aggregator, group, node.states[idx])
node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point)
if err != nil {
return false, err

View File

@ -363,6 +363,8 @@ type DerivativeAggregatorState struct {
type DerivativeAggregator struct {
AbstractAggregator
duration *time.Duration // if it's group by time()
lastState *DerivativeAggregatorState
defaultValue *protocol.FieldValue
alias string
}
@ -393,12 +395,27 @@ func (self *DerivativeAggregator) AggregatePoint(state interface{}, p *protocol.
s = &DerivativeAggregatorState{}
}
// starting a new bucket? (only for group by time())
if s != self.lastState && self.duration != nil {
// if there was a previous bucket, update its lastValue
if self.lastState != nil {
self.lastState.lastValue = newValue
}
// save the current state as the last
self.lastState = s
}
if s.firstValue == nil {
s.firstValue = newValue
return s, nil
}
if self.duration == nil {
s.lastValue = newValue
} else {
s.lastValue = s.firstValue
}
return s, nil
}
@ -444,13 +461,20 @@ func NewDerivativeAggregator(q *parser.SelectQuery, v *parser.Value, defaultValu
return nil, err
}
return &DerivativeAggregator{
da := &DerivativeAggregator{
AbstractAggregator: AbstractAggregator{
value: v.Elems[0],
},
defaultValue: wrappedDefaultValue,
alias: v.Alias,
}, nil
}
da.duration, _, err = q.GetGroupByClause().GetGroupByTime()
if err != nil {
return nil, err
}
return da, nil
}
//

View File

@ -2309,10 +2309,10 @@ var (
[
{
"points": [
[310000, 400.0],
[300000, 30.0],
[120000, 20.0],
[60000, 10.0],
[310000, 15.0]
[60000, 5.0]
],
"name": "data",
"columns": ["time", "value"]
@ -2340,7 +2340,7 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg
// write test data to the database
self.client.WriteJsonData(tstData, c, influxdb.Millisecond)
// build the test query string
query := fmtFillQuery(aggregate, aggArgs, "data", fill)
query := fmtQuery(aggregate, aggArgs, "data", fill)
// run the query
series := self.client.RunQuery(query, c)
// check that we only got one result series
@ -2357,14 +2357,19 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg
}
}
func fmtFillQuery(aggregate string, aggArgs []interface{}, series, fill string) string {
func fmtQuery(aggregate string, aggArgs []interface{}, series, fill string) string {
args := "value"
for _, arg := range aggArgs {
args = fmt.Sprintf("%s, %v", args, arg)
}
if fill != "" {
return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill)
}
return fmt.Sprintf("select %s(%s) from %s group by time(60s) where time > 60s and time < 320s", aggregate, args, series)
}
var emptyAggArgs []interface{}
// tv holds a time / value pair (at this time, the value was)
@ -2373,6 +2378,53 @@ type tv struct {
Value interface{}
}
// Test Derivative with consecutive buckets and filling later buckets
func (self *DataTestSuite) TestIssue334DerivativeConsecutiveBucketsFillLater(c *C) {
data := `
[
{
"name": "data",
"columns": ["time", "value"],
"points": [
[130000, 80.0],
[120000, 40.0],
[70000, 20.0],
[60000, 10.0]
]
}
]`
// the 120000 bucket includes the points 80.0, 40.0 and the new value from the next bucket 20.0
// the 60000 bucket includes two points only 20.0 and 10.0
expect := []tv{{300000.0, nil}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}}
self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c)
}
// Test Derivative with non-consecutive buckets and filling in between
func (self *DataTestSuite) TestIssue334DerivativeNonConsecutiveBucketsFillBetween(c *C) {
data := `
[
{
"name": "data",
"columns": ["time", "value"],
"points": [
[250000, 320.0],
[240000, 90.0],
[130000, 80.0],
[120000, 40.0],
[70000, 20.0],
[60000, 10.0]
]
}
]`
// The 300000 bucket includes no points
// the 240000 bucket includes 320.0, 90.0 and 80.0 from the following bucket (240.0 / 120 = 2)
// the 180000 bucket includes no points
// the 120000 bucket includes 80.0, 40.0 and 20.0 from the next bucket (60.0 / 60 = 1)
// the 60000 bucket includes 20.0 and 10.0 (10 / 10 = 1)
expect := []tv{{300000.0, nil}, {240000.0, 2.0}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}}
self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c)
}
// count aggregate filling with null
func (self *DataTestSuite) TestCountAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, 1.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, 1.0}}
@ -2537,32 +2589,32 @@ func (self *DataTestSuite) TestBottom10AggregateFillWith0(c *C) {
// derivative aggregate filling with null
func (self *DataTestSuite) TestDerivativeAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, -1.5}, {240000.0, nil}, {180000.0, nil}}
expVals := []tv{{300000.0, 2.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 0.25}}
self.tstAggregateFill(aggTstData2, "derivative", "null", emptyAggArgs, expVals, c)
}
// derivative aggregate filling with 0
func (self *DataTestSuite) TestDerivativeAggregateFillWith0(c *C) {
expVals := []tv{{300000.0, -1.5}, {240000.0, 0.0}, {180000.0, 0.0}}
expVals := []tv{{300000.0, 2.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.25}}
self.tstAggregateFill(aggTstData2, "derivative", "0", emptyAggArgs, expVals, c)
}
// difference aggregate filling with null
func (self *DataTestSuite) TestDifferenceAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, 15.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}}
expVals := []tv{{300000.0, -370.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}}
self.tstAggregateFill(aggTstData2, "difference", "null", emptyAggArgs, expVals, c)
}
// difference aggregate filling with 0
func (self *DataTestSuite) TestDifferenceAggregateFillWith0(c *C) {
expVals := []tv{{300000.0, 15.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}}
expVals := []tv{{300000.0, -370.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}}
self.tstAggregateFill(aggTstData2, "difference", "0", emptyAggArgs, expVals, c)
}
// histogram aggregate filling with null
func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) {
self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond)
series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "null"), c)
series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "null"), c)
c.Assert(len(series), Equals, 1)
maps := ToMap(series[0])
c.Assert(len(maps), Equals, 6)
@ -2574,7 +2626,7 @@ func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) {
// histogram aggregate filling with 0
func (self *DataTestSuite) TestHistogramAggregateFillWith0(c *C) {
self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond)
series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "0"), c)
series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "0"), c)
c.Assert(len(series), Equals, 1)
maps := ToMap(series[0])
c.Assert(len(maps), Equals, 6)