From 2482f69d312d2df10bb77ec5bcb982a0f2ef8e21 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 9 Sep 2014 18:57:46 -0400 Subject: [PATCH] Fix #780 so that fill works with all aggregates Conflicts: engine/aggregator.go integration/data_test.go --- engine/aggregator.go | 45 ++-- integration/data_test.go | 513 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 542 insertions(+), 16 deletions(-) diff --git a/engine/aggregator.go b/engine/aggregator.go index 0ef08048f0..3351845e80 100644 --- a/engine/aggregator.go +++ b/engine/aggregator.go @@ -122,11 +122,7 @@ func (self *CumulativeArithmeticAggregator) ColumnNames() []string { func (self *CumulativeArithmeticAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { if state == nil { - return [][]*protocol.FieldValue{ - { - {DoubleValue: &self.initialValue}, - }, - } + return [][]*protocol.FieldValue{{self.defaultValue}} } return [][]*protocol.FieldValue{ @@ -317,7 +313,7 @@ func (self *StandardDeviationAggregator) ColumnNames() []string { func (self *StandardDeviationAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { r, ok := state.(*StandardDeviationRunning) if !ok { - return nil + return [][]*protocol.FieldValue{{self.defaultValue}} } eX := r.totalX / float64(r.count) @@ -413,8 +409,11 @@ func (self *DerivativeAggregator) ColumnNames() []string { func (self *DerivativeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { s, ok := state.(*DerivativeAggregatorState) + if !ok { + return [][]*protocol.FieldValue{{self.defaultValue}} + } - if !(ok && s.firstValue != nil && s.lastValue != nil) { + if s.firstValue == nil || s.lastValue == nil { return nil } @@ -511,9 +510,8 @@ func (self *DifferenceAggregator) ColumnNames() []string { func (self *DifferenceAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { s, ok := state.(*DifferenceAggregatorState) - if !(ok && s.firstValue != nil && s.lastValue != nil) { - return nil + return [][]*protocol.FieldValue{{self.defaultValue}} } difference := *s.lastValue.Values[0].DoubleValue - *s.firstValue.Values[0].DoubleValue @@ -555,8 +553,9 @@ type HistogramAggregatorState map[int]int type HistogramAggregator struct { AbstractAggregator - bucketSize float64 - columnNames []string + bucketSize float64 + columnNames []string + defaultValue *protocol.FieldValue } func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) { @@ -589,6 +588,14 @@ func (self *HistogramAggregator) ColumnNames() []string { func (self *HistogramAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { returnValues := [][]*protocol.FieldValue{} + if state == nil { + _size := int64(0) + returnValues = append(returnValues, []*protocol.FieldValue{ + self.defaultValue, + {Int64Value: &_size}, + }) + return returnValues + } buckets := state.(HistogramAggregatorState) for bucket, size := range buckets { _bucket := float64(bucket) * self.bucketSize @@ -876,9 +883,7 @@ func (self *PercentileAggregator) ColumnNames() []string { func (self *PercentileAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { s, ok := state.(*PercentileAggregatorState) if !ok { - return [][]*protocol.FieldValue{ - {self.defaultValue}, - } + return [][]*protocol.FieldValue{{self.defaultValue}} } return [][]*protocol.FieldValue{ {{DoubleValue: &s.percentileValue}}, @@ -985,7 +990,10 @@ func (self *ModeAggregator) ColumnNames() []string { } func (self *ModeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { - s := state.(*ModeAggregatorState) + s, ok := state.(*ModeAggregatorState) + if !ok { + return [][]*protocol.FieldValue{{self.defaultValue}} + } counts := make([]int, len(s.counts)) countMap := make(map[int][]interface{}, len(s.counts)) @@ -1131,6 +1139,7 @@ func (self *DistinctAggregator) GetValues(state interface{}) [][]*protocol.Field s, ok := state.(*DistinctAggregatorState) if !ok || len(s.counts) == 0 { returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue}) + return returnValues } for value := range s.counts { @@ -1195,7 +1204,11 @@ func (self *FirstOrLastAggregator) ColumnNames() []string { } func (self *FirstOrLastAggregator) GetValues(state interface{}) [][]*protocol.FieldValue { - s := state.(FirstOrLastAggregatorState) + s, ok := state.(FirstOrLastAggregatorState) + if !ok { + return [][]*protocol.FieldValue{{self.defaultValue}} + } + return [][]*protocol.FieldValue{ { s, diff --git a/integration/data_test.go b/integration/data_test.go index df52f6b9e0..4707eaf226 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -2509,3 +2509,516 @@ func (self *DataTestSuite) MeanAggregateFillWithZero(c *C) (Fun, Fun) { } } } + +// issue #669 +func HistogramHelper(c *C, client Client, query string, expected map[float64]float64) { + //Test basic histogram + collection := client.RunQuery(query, c) + + c.Assert(collection, HasLen, 1) + maps := ToMap(collection[0]) + actual := make(map[float64]float64, len(maps)) + for key := range maps { + c.Logf(fmt.Sprintf(`%d: bucket_start: %f count: %f`, key, maps[key]["bucket_start"], maps[key]["count"])) + actual[maps[key]["bucket_start"].(float64)] = maps[key]["count"].(float64) + } + c.Assert(actual, HasLen, len(expected)) + for bucket, count := range expected { + c.Assert(actual[bucket], Equals, count) + } +} + +func (self *DataTestSuite) Histogram(c *C) (Fun, Fun) { + return func(client Client) { + c.Logf("Running Histogram test") + data := `[{"points": [[-3], [-2], [-1], [0], [1], [2], [3]], "name": "test_histogram", "columns": ["value"]}]` + client.WriteJsonData(data, c) + }, func(client Client) { + //Test basic histogram + expected := make(map[float64]float64, 7) + expected[-3.0] = 1.0 + expected[-2.0] = 1.0 + expected[-1.0] = 1.0 + expected[0.0] = 1.0 + expected[1.0] = 1.0 + expected[2.0] = 1.0 + expected[3.0] = 1.0 + HistogramHelper(c, client, "select Histogram(value, 1.0) from test_histogram", expected) + } +} + +func aggregateWithFillSetup(aggregate, fill string, c *C) Fun { + return func(client Client) { + seriesName := fmt.Sprintf("test_%s_fill_%s", aggregate, fill) + data := fmt.Sprintf("%s%s%s", ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0] + ], + "name": "`, seriesName, `", + "columns": ["time", "value"] + } +]`) + client.WriteJsonData(data, c, influxdb.Millisecond) + } +} + +func aggregateWithFillTest(aggregate string, aggArgs []interface{}, fill string, expVals []interface{}, c *C) Fun { + return func(client Client) { + seriesName := fmt.Sprintf("test_%s_fill_%s", aggregate, fill) + series := client.RunQuery(fmtFillQuery(aggregate, aggArgs, seriesName, fill), c) + c.Assert(len(series), Equals, 1) + maps := ToMap(series[0]) + c.Assert(len(maps), Equals, len(expVals)) + c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, aggregate: expVals[0]}) + c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, aggregate: expVals[1]}) + c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, aggregate: expVals[2]}) + c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 120000.0, aggregate: expVals[3]}) + c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 60000.0, aggregate: expVals[4]}) + } +} + +var emptyAggArgs []interface{} + +// count aggregate filling with null +func (self *DataTestSuite) CountAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("count", "null", c) + testFn := aggregateWithFillTest("count", emptyAggArgs, "null", []interface{}{1.0, nil, nil, 1.0, 1.0}, c) + + return setupFn, testFn +} + +// count aggregate filling with 0 +func (self *DataTestSuite) CountAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("count", "0", c) + testFn := aggregateWithFillTest("count", emptyAggArgs, "0", []interface{}{1.0, 0.0, 0.0, 1.0, 1.0}, c) + + return setupFn, testFn +} + +// min aggregate filling with null +func (self *DataTestSuite) MinAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("min", "null", c) + testFn := aggregateWithFillTest("min", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// min aggregate filling with 0 +func (self *DataTestSuite) MinAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("min", "0", c) + testFn := aggregateWithFillTest("min", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// max aggregate filling with null +func (self *DataTestSuite) MaxAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("max", "null", c) + testFn := aggregateWithFillTest("max", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// max aggregate filling with 0 +func (self *DataTestSuite) MaxAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("max", "0", c) + testFn := aggregateWithFillTest("max", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// mode aggregate filling with null +func (self *DataTestSuite) ModeAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("mode", "null", c) + testFn := aggregateWithFillTest("mode", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// mode aggregate filling with 0 +func (self *DataTestSuite) ModeAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("mode", "0", c) + testFn := aggregateWithFillTest("mode", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// median aggregate filling with null +func (self *DataTestSuite) MedianAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("median", "null", c) + testFn := aggregateWithFillTest("median", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// median aggregate filling with 0 +func (self *DataTestSuite) MedianAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("median", "0", c) + testFn := aggregateWithFillTest("median", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// distinct aggregate filling with null +func (self *DataTestSuite) DistinctAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("distinct", "null", c) + testFn := aggregateWithFillTest("distinct", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// distinct aggregate filling with 0 +func (self *DataTestSuite) DistinctAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("distinct", "0", c) + testFn := aggregateWithFillTest("distinct", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// percentile aggregate filling with null +func (self *DataTestSuite) PercentileAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("percentile", "null", c) + testFn := aggregateWithFillTest("percentile", []interface{}{10}, "null", []interface{}{0.0, nil, nil, 0.0, 0.0}, c) + + return setupFn, testFn +} + +// percentile aggregate filling with 0 +func (self *DataTestSuite) PercentileAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("percentile", "0", c) + testFn := aggregateWithFillTest("percentile", []interface{}{10}, "0", []interface{}{0.0, 0.0, 0.0, 0.0, 0.0}, c) + + return setupFn, testFn +} + +// histogram aggregate filling with null +func (self *DataTestSuite) HistogramAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_histogram_fill_null", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := func(client Client) { + series := client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "test_histogram_fill_null", "null"), c) + c.Assert(len(series), Equals, 1) + maps := ToMap(series[0]) + c.Assert(len(maps), Equals, 6) + + // FIXME: Can't test return values because the order of the returned data is randomized. Once + // engine/aggregator_operators.go func(self *HistogramAggregator) GetValues(...) is + // is modified to sort data, update these Asserts. + //c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 15.0, "count": 1.0}) + //c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 30.0, "count": 1.0}) + //c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 240000.0, "bucket_start": nil, "count": 0.0}) + //c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 180000.0, "bucket_start": nil, "count": 0.0}) + //c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 120000.0, "bucket_start": 20.0, "count": 1.0}) + //c.Assert(maps[5], DeepEquals, map[string]interface{}{"time": 60000.0, "bucket_start": 10.0, "count": 1.0}) + } + + return setupFn, testFn +} + +// histogram aggregate filling with 0 +func (self *DataTestSuite) HistogramAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_histogram_fill_0", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := func(client Client) { + series := client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "test_histogram_fill_0", "0"), c) + c.Assert(len(series), Equals, 1) + maps := ToMap(series[0]) + c.Assert(len(maps), Equals, 6) + + // FIXME: Can't test return values because the order of the returned data is randomized. Once + // engine/aggregator_operators.go func(self *HistogramAggregator) GetValues(...) is + // is modified to sort data, update these Asserts. + //c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 15.0, "count": 1.0}) + //c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 30.0, "count": 1.0}) + //c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 240000.0, "bucket_start": nil, "count": 0.0}) + //c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 180000.0, "bucket_start": nil, "count": 0.0}) + //c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 120000.0, "bucket_start": 20.0, "count": 1.0}) + //c.Assert(maps[5], DeepEquals, map[string]interface{}{"time": 60000.0, "bucket_start": 10.0, "count": 1.0}) + } + + return setupFn, testFn +} + +// derivative aggregate filling with null +func (self *DataTestSuite) DerivativeAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_derivative_fill_null", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := func(client Client) { + series := client.RunQuery(fmtFillQuery("derivative", []interface{}{}, "test_derivative_fill_null", "null"), c) + c.Assert(len(series), Equals, 1) + maps := ToMap(series[0]) + c.Assert(len(maps), Equals, 3) + + c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "derivative": -1.5}) + c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, "derivative": nil}) + c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, "derivative": nil}) + } + + return setupFn, testFn +} + +// derivative aggregate filling with 0 +func (self *DataTestSuite) DerivativeAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_derivative_fill_0", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := func(client Client) { + series := client.RunQuery(fmtFillQuery("derivative", []interface{}{}, "test_derivative_fill_0", "0"), c) + c.Assert(len(series), Equals, 1) + maps := ToMap(series[0]) + c.Assert(len(maps), Equals, 3) + + c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "derivative": -1.5}) + c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, "derivative": 0.0}) + c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, "derivative": 0.0}) + } + + return setupFn, testFn +} + +// sum aggregate filling with null +func (self *DataTestSuite) SumAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("sum", "null", c) + testFn := aggregateWithFillTest("sum", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// sum aggregate filling with 0 +func (self *DataTestSuite) SumAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("sum", "0", c) + testFn := aggregateWithFillTest("sum", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// stddev aggregate filling with null +func (self *DataTestSuite) StddevAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("stddev", "null", c) + testFn := aggregateWithFillTest("stddev", emptyAggArgs, "null", []interface{}{0.0, nil, nil, 0.0, 0.0}, c) + + return setupFn, testFn +} + +// stddev aggregate filling with 0 +func (self *DataTestSuite) StddevAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("stddev", "0", c) + testFn := aggregateWithFillTest("stddev", emptyAggArgs, "0", []interface{}{0.0, 0.0, 0.0, 0.0, 0.0}, c) + + return setupFn, testFn +} + +// first aggregate filling with null +func (self *DataTestSuite) FirstAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("first", "null", c) + testFn := aggregateWithFillTest("first", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// first aggregate filling with 0 +func (self *DataTestSuite) FirstAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("first", "0", c) + testFn := aggregateWithFillTest("first", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// last aggregate filling with null +func (self *DataTestSuite) LastAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("last", "null", c) + testFn := aggregateWithFillTest("last", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// last aggregate filling with 0 +func (self *DataTestSuite) LastAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("last", "0", c) + testFn := aggregateWithFillTest("last", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// difference aggregate filling with null +func (self *DataTestSuite) DifferenceAggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_difference_fill_null", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := aggregateWithFillTest("difference", emptyAggArgs, "null", []interface{}{15.0, nil, nil, nil, nil}, c) + + return setupFn, testFn +} + +// difference aggregate filling with 0 +func (self *DataTestSuite) DifferenceAggregateFillWith0(c *C) (Fun, Fun) { + setupFn := func(client Client) { + data := ` +[ + { + "points": [ + [300000, 30.0], + [120000, 20.0], + [60000, 10.0], + [310000, 15.0] + ], + "name": "test_difference_fill_0", + "columns": ["time", "value"] + } +]` + client.WriteJsonData(data, c, influxdb.Millisecond) + + } + testFn := aggregateWithFillTest("difference", emptyAggArgs, "0", []interface{}{15.0, 0.0, 0.0, 0.0, 0.0}, c) + + return setupFn, testFn +} + +// top 1 aggregate filling with null +func (self *DataTestSuite) Top1AggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("top", "null", c) + testFn := aggregateWithFillTest("top", []interface{}{1}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// top 1 aggregate filling with 0 +func (self *DataTestSuite) Top1AggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("top", "0", c) + testFn := aggregateWithFillTest("top", []interface{}{1}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// top 10 aggregate filling with null +func (self *DataTestSuite) Top10AggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("top", "null", c) + testFn := aggregateWithFillTest("top", []interface{}{10}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// top 10 aggregate filling with 0 +func (self *DataTestSuite) Top10AggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("top", "0", c) + testFn := aggregateWithFillTest("top", []interface{}{10}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// bottom 1 aggregate filling with null +func (self *DataTestSuite) Bottom1AggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("bottom", "null", c) + testFn := aggregateWithFillTest("bottom", []interface{}{1}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// bottom 1 aggregate filling with 0 +func (self *DataTestSuite) Bottom1AggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("bottom", "0", c) + testFn := aggregateWithFillTest("bottom", []interface{}{1}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// bottom 10 aggregate filling with null +func (self *DataTestSuite) Bottom10AggregateFillWithNull(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("bottom", "null", c) + testFn := aggregateWithFillTest("bottom", []interface{}{10}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c) + + return setupFn, testFn +} + +// bottom 10 aggregate filling with 0 +func (self *DataTestSuite) Bottom10AggregateFillWith0(c *C) (Fun, Fun) { + setupFn := aggregateWithFillSetup("bottom", "0", c) + testFn := aggregateWithFillTest("bottom", []interface{}{10}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c) + + return setupFn, testFn +} + +func fmtFillQuery(aggregate string, aggArgs []interface{}, series, fill string) string { + args := "value" + for _, arg := range aggArgs { + args = fmt.Sprintf("%s, %v", args, arg) + } + return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill) +}