Fix #780 so that fill works with all aggregates
Conflicts: engine/aggregator.go integration/data_test.gopull/991/head
parent
4556bcb714
commit
2482f69d31
|
@ -122,11 +122,7 @@ func (self *CumulativeArithmeticAggregator) ColumnNames() []string {
|
||||||
|
|
||||||
func (self *CumulativeArithmeticAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *CumulativeArithmeticAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
if state == nil {
|
if state == nil {
|
||||||
return [][]*protocol.FieldValue{
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
{
|
|
||||||
{DoubleValue: &self.initialValue},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return [][]*protocol.FieldValue{
|
return [][]*protocol.FieldValue{
|
||||||
|
@ -317,7 +313,7 @@ func (self *StandardDeviationAggregator) ColumnNames() []string {
|
||||||
func (self *StandardDeviationAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *StandardDeviationAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
r, ok := state.(*StandardDeviationRunning)
|
r, ok := state.(*StandardDeviationRunning)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
}
|
}
|
||||||
|
|
||||||
eX := r.totalX / float64(r.count)
|
eX := r.totalX / float64(r.count)
|
||||||
|
@ -413,8 +409,11 @@ func (self *DerivativeAggregator) ColumnNames() []string {
|
||||||
|
|
||||||
func (self *DerivativeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *DerivativeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
s, ok := state.(*DerivativeAggregatorState)
|
s, ok := state.(*DerivativeAggregatorState)
|
||||||
|
if !ok {
|
||||||
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
|
}
|
||||||
|
|
||||||
if !(ok && s.firstValue != nil && s.lastValue != nil) {
|
if s.firstValue == nil || s.lastValue == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,9 +510,8 @@ func (self *DifferenceAggregator) ColumnNames() []string {
|
||||||
|
|
||||||
func (self *DifferenceAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *DifferenceAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
s, ok := state.(*DifferenceAggregatorState)
|
s, ok := state.(*DifferenceAggregatorState)
|
||||||
|
|
||||||
if !(ok && s.firstValue != nil && s.lastValue != nil) {
|
if !(ok && s.firstValue != nil && s.lastValue != nil) {
|
||||||
return nil
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
}
|
}
|
||||||
|
|
||||||
difference := *s.lastValue.Values[0].DoubleValue - *s.firstValue.Values[0].DoubleValue
|
difference := *s.lastValue.Values[0].DoubleValue - *s.firstValue.Values[0].DoubleValue
|
||||||
|
@ -555,8 +553,9 @@ type HistogramAggregatorState map[int]int
|
||||||
|
|
||||||
type HistogramAggregator struct {
|
type HistogramAggregator struct {
|
||||||
AbstractAggregator
|
AbstractAggregator
|
||||||
bucketSize float64
|
bucketSize float64
|
||||||
columnNames []string
|
columnNames []string
|
||||||
|
defaultValue *protocol.FieldValue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
|
func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
|
||||||
|
@ -589,6 +588,14 @@ func (self *HistogramAggregator) ColumnNames() []string {
|
||||||
|
|
||||||
func (self *HistogramAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *HistogramAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
returnValues := [][]*protocol.FieldValue{}
|
returnValues := [][]*protocol.FieldValue{}
|
||||||
|
if state == nil {
|
||||||
|
_size := int64(0)
|
||||||
|
returnValues = append(returnValues, []*protocol.FieldValue{
|
||||||
|
self.defaultValue,
|
||||||
|
{Int64Value: &_size},
|
||||||
|
})
|
||||||
|
return returnValues
|
||||||
|
}
|
||||||
buckets := state.(HistogramAggregatorState)
|
buckets := state.(HistogramAggregatorState)
|
||||||
for bucket, size := range buckets {
|
for bucket, size := range buckets {
|
||||||
_bucket := float64(bucket) * self.bucketSize
|
_bucket := float64(bucket) * self.bucketSize
|
||||||
|
@ -876,9 +883,7 @@ func (self *PercentileAggregator) ColumnNames() []string {
|
||||||
func (self *PercentileAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *PercentileAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
s, ok := state.(*PercentileAggregatorState)
|
s, ok := state.(*PercentileAggregatorState)
|
||||||
if !ok {
|
if !ok {
|
||||||
return [][]*protocol.FieldValue{
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
{self.defaultValue},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return [][]*protocol.FieldValue{
|
return [][]*protocol.FieldValue{
|
||||||
{{DoubleValue: &s.percentileValue}},
|
{{DoubleValue: &s.percentileValue}},
|
||||||
|
@ -985,7 +990,10 @@ func (self *ModeAggregator) ColumnNames() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *ModeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *ModeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
s := state.(*ModeAggregatorState)
|
s, ok := state.(*ModeAggregatorState)
|
||||||
|
if !ok {
|
||||||
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
|
}
|
||||||
|
|
||||||
counts := make([]int, len(s.counts))
|
counts := make([]int, len(s.counts))
|
||||||
countMap := make(map[int][]interface{}, len(s.counts))
|
countMap := make(map[int][]interface{}, len(s.counts))
|
||||||
|
@ -1131,6 +1139,7 @@ func (self *DistinctAggregator) GetValues(state interface{}) [][]*protocol.Field
|
||||||
s, ok := state.(*DistinctAggregatorState)
|
s, ok := state.(*DistinctAggregatorState)
|
||||||
if !ok || len(s.counts) == 0 {
|
if !ok || len(s.counts) == 0 {
|
||||||
returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue})
|
returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue})
|
||||||
|
return returnValues
|
||||||
}
|
}
|
||||||
|
|
||||||
for value := range s.counts {
|
for value := range s.counts {
|
||||||
|
@ -1195,7 +1204,11 @@ func (self *FirstOrLastAggregator) ColumnNames() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *FirstOrLastAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
func (self *FirstOrLastAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||||
s := state.(FirstOrLastAggregatorState)
|
s, ok := state.(FirstOrLastAggregatorState)
|
||||||
|
if !ok {
|
||||||
|
return [][]*protocol.FieldValue{{self.defaultValue}}
|
||||||
|
}
|
||||||
|
|
||||||
return [][]*protocol.FieldValue{
|
return [][]*protocol.FieldValue{
|
||||||
{
|
{
|
||||||
s,
|
s,
|
||||||
|
|
|
@ -2509,3 +2509,516 @@ func (self *DataTestSuite) MeanAggregateFillWithZero(c *C) (Fun, Fun) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// issue #669
|
||||||
|
func HistogramHelper(c *C, client Client, query string, expected map[float64]float64) {
|
||||||
|
//Test basic histogram
|
||||||
|
collection := client.RunQuery(query, c)
|
||||||
|
|
||||||
|
c.Assert(collection, HasLen, 1)
|
||||||
|
maps := ToMap(collection[0])
|
||||||
|
actual := make(map[float64]float64, len(maps))
|
||||||
|
for key := range maps {
|
||||||
|
c.Logf(fmt.Sprintf(`%d: bucket_start: %f count: %f`, key, maps[key]["bucket_start"], maps[key]["count"]))
|
||||||
|
actual[maps[key]["bucket_start"].(float64)] = maps[key]["count"].(float64)
|
||||||
|
}
|
||||||
|
c.Assert(actual, HasLen, len(expected))
|
||||||
|
for bucket, count := range expected {
|
||||||
|
c.Assert(actual[bucket], Equals, count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *DataTestSuite) Histogram(c *C) (Fun, Fun) {
|
||||||
|
return func(client Client) {
|
||||||
|
c.Logf("Running Histogram test")
|
||||||
|
data := `[{"points": [[-3], [-2], [-1], [0], [1], [2], [3]], "name": "test_histogram", "columns": ["value"]}]`
|
||||||
|
client.WriteJsonData(data, c)
|
||||||
|
}, func(client Client) {
|
||||||
|
//Test basic histogram
|
||||||
|
expected := make(map[float64]float64, 7)
|
||||||
|
expected[-3.0] = 1.0
|
||||||
|
expected[-2.0] = 1.0
|
||||||
|
expected[-1.0] = 1.0
|
||||||
|
expected[0.0] = 1.0
|
||||||
|
expected[1.0] = 1.0
|
||||||
|
expected[2.0] = 1.0
|
||||||
|
expected[3.0] = 1.0
|
||||||
|
HistogramHelper(c, client, "select Histogram(value, 1.0) from test_histogram", expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func aggregateWithFillSetup(aggregate, fill string, c *C) Fun {
|
||||||
|
return func(client Client) {
|
||||||
|
seriesName := fmt.Sprintf("test_%s_fill_%s", aggregate, fill)
|
||||||
|
data := fmt.Sprintf("%s%s%s", `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0]
|
||||||
|
],
|
||||||
|
"name": "`, seriesName, `",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`)
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func aggregateWithFillTest(aggregate string, aggArgs []interface{}, fill string, expVals []interface{}, c *C) Fun {
|
||||||
|
return func(client Client) {
|
||||||
|
seriesName := fmt.Sprintf("test_%s_fill_%s", aggregate, fill)
|
||||||
|
series := client.RunQuery(fmtFillQuery(aggregate, aggArgs, seriesName, fill), c)
|
||||||
|
c.Assert(len(series), Equals, 1)
|
||||||
|
maps := ToMap(series[0])
|
||||||
|
c.Assert(len(maps), Equals, len(expVals))
|
||||||
|
c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, aggregate: expVals[0]})
|
||||||
|
c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, aggregate: expVals[1]})
|
||||||
|
c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, aggregate: expVals[2]})
|
||||||
|
c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 120000.0, aggregate: expVals[3]})
|
||||||
|
c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 60000.0, aggregate: expVals[4]})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var emptyAggArgs []interface{}
|
||||||
|
|
||||||
|
// count aggregate filling with null
|
||||||
|
func (self *DataTestSuite) CountAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("count", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("count", emptyAggArgs, "null", []interface{}{1.0, nil, nil, 1.0, 1.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// count aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) CountAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("count", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("count", emptyAggArgs, "0", []interface{}{1.0, 0.0, 0.0, 1.0, 1.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// min aggregate filling with null
|
||||||
|
func (self *DataTestSuite) MinAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("min", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("min", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// min aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) MinAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("min", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("min", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// max aggregate filling with null
|
||||||
|
func (self *DataTestSuite) MaxAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("max", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("max", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// max aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) MaxAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("max", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("max", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// mode aggregate filling with null
|
||||||
|
func (self *DataTestSuite) ModeAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("mode", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("mode", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// mode aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) ModeAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("mode", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("mode", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// median aggregate filling with null
|
||||||
|
func (self *DataTestSuite) MedianAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("median", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("median", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// median aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) MedianAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("median", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("median", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// distinct aggregate filling with null
|
||||||
|
func (self *DataTestSuite) DistinctAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("distinct", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("distinct", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// distinct aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) DistinctAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("distinct", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("distinct", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// percentile aggregate filling with null
|
||||||
|
func (self *DataTestSuite) PercentileAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("percentile", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("percentile", []interface{}{10}, "null", []interface{}{0.0, nil, nil, 0.0, 0.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// percentile aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) PercentileAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("percentile", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("percentile", []interface{}{10}, "0", []interface{}{0.0, 0.0, 0.0, 0.0, 0.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// histogram aggregate filling with null
|
||||||
|
func (self *DataTestSuite) HistogramAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_histogram_fill_null",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := func(client Client) {
|
||||||
|
series := client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "test_histogram_fill_null", "null"), c)
|
||||||
|
c.Assert(len(series), Equals, 1)
|
||||||
|
maps := ToMap(series[0])
|
||||||
|
c.Assert(len(maps), Equals, 6)
|
||||||
|
|
||||||
|
// FIXME: Can't test return values because the order of the returned data is randomized. Once
|
||||||
|
// engine/aggregator_operators.go func(self *HistogramAggregator) GetValues(...) is
|
||||||
|
// is modified to sort data, update these Asserts.
|
||||||
|
//c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 15.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 30.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 240000.0, "bucket_start": nil, "count": 0.0})
|
||||||
|
//c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 180000.0, "bucket_start": nil, "count": 0.0})
|
||||||
|
//c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 120000.0, "bucket_start": 20.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[5], DeepEquals, map[string]interface{}{"time": 60000.0, "bucket_start": 10.0, "count": 1.0})
|
||||||
|
}
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// histogram aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) HistogramAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_histogram_fill_0",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := func(client Client) {
|
||||||
|
series := client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "test_histogram_fill_0", "0"), c)
|
||||||
|
c.Assert(len(series), Equals, 1)
|
||||||
|
maps := ToMap(series[0])
|
||||||
|
c.Assert(len(maps), Equals, 6)
|
||||||
|
|
||||||
|
// FIXME: Can't test return values because the order of the returned data is randomized. Once
|
||||||
|
// engine/aggregator_operators.go func(self *HistogramAggregator) GetValues(...) is
|
||||||
|
// is modified to sort data, update these Asserts.
|
||||||
|
//c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 15.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 300000.0, "bucket_start": 30.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 240000.0, "bucket_start": nil, "count": 0.0})
|
||||||
|
//c.Assert(maps[3], DeepEquals, map[string]interface{}{"time": 180000.0, "bucket_start": nil, "count": 0.0})
|
||||||
|
//c.Assert(maps[4], DeepEquals, map[string]interface{}{"time": 120000.0, "bucket_start": 20.0, "count": 1.0})
|
||||||
|
//c.Assert(maps[5], DeepEquals, map[string]interface{}{"time": 60000.0, "bucket_start": 10.0, "count": 1.0})
|
||||||
|
}
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// derivative aggregate filling with null
|
||||||
|
func (self *DataTestSuite) DerivativeAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_derivative_fill_null",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := func(client Client) {
|
||||||
|
series := client.RunQuery(fmtFillQuery("derivative", []interface{}{}, "test_derivative_fill_null", "null"), c)
|
||||||
|
c.Assert(len(series), Equals, 1)
|
||||||
|
maps := ToMap(series[0])
|
||||||
|
c.Assert(len(maps), Equals, 3)
|
||||||
|
|
||||||
|
c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "derivative": -1.5})
|
||||||
|
c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, "derivative": nil})
|
||||||
|
c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, "derivative": nil})
|
||||||
|
}
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// derivative aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) DerivativeAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_derivative_fill_0",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := func(client Client) {
|
||||||
|
series := client.RunQuery(fmtFillQuery("derivative", []interface{}{}, "test_derivative_fill_0", "0"), c)
|
||||||
|
c.Assert(len(series), Equals, 1)
|
||||||
|
maps := ToMap(series[0])
|
||||||
|
c.Assert(len(maps), Equals, 3)
|
||||||
|
|
||||||
|
c.Assert(maps[0], DeepEquals, map[string]interface{}{"time": 300000.0, "derivative": -1.5})
|
||||||
|
c.Assert(maps[1], DeepEquals, map[string]interface{}{"time": 240000.0, "derivative": 0.0})
|
||||||
|
c.Assert(maps[2], DeepEquals, map[string]interface{}{"time": 180000.0, "derivative": 0.0})
|
||||||
|
}
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// sum aggregate filling with null
|
||||||
|
func (self *DataTestSuite) SumAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("sum", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("sum", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// sum aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) SumAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("sum", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("sum", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// stddev aggregate filling with null
|
||||||
|
func (self *DataTestSuite) StddevAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("stddev", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("stddev", emptyAggArgs, "null", []interface{}{0.0, nil, nil, 0.0, 0.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// stddev aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) StddevAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("stddev", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("stddev", emptyAggArgs, "0", []interface{}{0.0, 0.0, 0.0, 0.0, 0.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// first aggregate filling with null
|
||||||
|
func (self *DataTestSuite) FirstAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("first", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("first", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// first aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) FirstAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("first", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("first", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// last aggregate filling with null
|
||||||
|
func (self *DataTestSuite) LastAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("last", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("last", emptyAggArgs, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// last aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) LastAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("last", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("last", emptyAggArgs, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// difference aggregate filling with null
|
||||||
|
func (self *DataTestSuite) DifferenceAggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_difference_fill_null",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := aggregateWithFillTest("difference", emptyAggArgs, "null", []interface{}{15.0, nil, nil, nil, nil}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// difference aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) DifferenceAggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := func(client Client) {
|
||||||
|
data := `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"points": [
|
||||||
|
[300000, 30.0],
|
||||||
|
[120000, 20.0],
|
||||||
|
[60000, 10.0],
|
||||||
|
[310000, 15.0]
|
||||||
|
],
|
||||||
|
"name": "test_difference_fill_0",
|
||||||
|
"columns": ["time", "value"]
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
client.WriteJsonData(data, c, influxdb.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
testFn := aggregateWithFillTest("difference", emptyAggArgs, "0", []interface{}{15.0, 0.0, 0.0, 0.0, 0.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// top 1 aggregate filling with null
|
||||||
|
func (self *DataTestSuite) Top1AggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("top", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("top", []interface{}{1}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// top 1 aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) Top1AggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("top", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("top", []interface{}{1}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// top 10 aggregate filling with null
|
||||||
|
func (self *DataTestSuite) Top10AggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("top", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("top", []interface{}{10}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// top 10 aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) Top10AggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("top", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("top", []interface{}{10}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// bottom 1 aggregate filling with null
|
||||||
|
func (self *DataTestSuite) Bottom1AggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("bottom", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("bottom", []interface{}{1}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// bottom 1 aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) Bottom1AggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("bottom", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("bottom", []interface{}{1}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// bottom 10 aggregate filling with null
|
||||||
|
func (self *DataTestSuite) Bottom10AggregateFillWithNull(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("bottom", "null", c)
|
||||||
|
testFn := aggregateWithFillTest("bottom", []interface{}{10}, "null", []interface{}{30.0, nil, nil, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
// bottom 10 aggregate filling with 0
|
||||||
|
func (self *DataTestSuite) Bottom10AggregateFillWith0(c *C) (Fun, Fun) {
|
||||||
|
setupFn := aggregateWithFillSetup("bottom", "0", c)
|
||||||
|
testFn := aggregateWithFillTest("bottom", []interface{}{10}, "0", []interface{}{30.0, 0.0, 0.0, 20.0, 10.0}, c)
|
||||||
|
|
||||||
|
return setupFn, testFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func fmtFillQuery(aggregate string, aggArgs []interface{}, series, fill string) string {
|
||||||
|
args := "value"
|
||||||
|
for _, arg := range aggArgs {
|
||||||
|
args = fmt.Sprintf("%s, %v", args, arg)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue