parent
f2e57008e2
commit
c8c40097c6
|
@ -557,8 +557,11 @@ type HistogramAggregatorState map[int]int
|
|||
|
||||
type HistogramAggregator struct {
|
||||
AbstractAggregator
|
||||
bucketSize float64
|
||||
columnNames []string
|
||||
bucketSize float64
|
||||
bucketStart float64
|
||||
explicitBucketStart bool
|
||||
bucketStopIdx int
|
||||
columnNames []string
|
||||
}
|
||||
|
||||
func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
|
||||
|
@ -579,8 +582,14 @@ func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.P
|
|||
value = *ptr
|
||||
}
|
||||
|
||||
bucket := int(value / self.bucketSize)
|
||||
buckets[bucket] += 1
|
||||
bucket := int(math.Floor((value - self.bucketStart)/ self.bucketSize))
|
||||
if self.bucketStopIdx >= 0 {
|
||||
if bucket <= self.bucketStopIdx {
|
||||
buckets[bucket] += 1
|
||||
}
|
||||
} else {
|
||||
buckets[bucket] += 1
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
@ -593,15 +602,34 @@ func (self *HistogramAggregator) GetValues(state interface{}) [][]*protocol.Fiel
|
|||
returnValues := [][]*protocol.FieldValue{}
|
||||
buckets := state.(HistogramAggregatorState)
|
||||
for bucket, size := range buckets {
|
||||
_bucket := float64(bucket) * self.bucketSize
|
||||
_bucket := float64(bucket) * self.bucketSize + self.bucketStart
|
||||
_size := int64(size)
|
||||
|
||||
if self.explicitBucketStart && _bucket < self.bucketStart {
|
||||
continue
|
||||
}
|
||||
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{
|
||||
{DoubleValue: &_bucket},
|
||||
{Int64Value: &_size},
|
||||
})
|
||||
}
|
||||
|
||||
if self.bucketStopIdx >= 0 {
|
||||
for i := 0; i <= self.bucketStopIdx; i++ {
|
||||
if _, ok := buckets[i]; ! ok {
|
||||
_bucket := float64(i) * self.bucketSize + self.bucketStart
|
||||
_size := int64(0)
|
||||
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{
|
||||
{DoubleValue: &_bucket},
|
||||
{Int64Value: &_size},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return returnValues
|
||||
}
|
||||
|
||||
|
@ -610,8 +638,8 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
|
|||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() requires at least one arguments")
|
||||
}
|
||||
|
||||
if len(v.Elems) > 2 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() takes at most two arguments")
|
||||
if len(v.Elems) > 4 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() takes at most four arguments")
|
||||
}
|
||||
|
||||
if v.Elems[0].Type == parser.ValueWildcard {
|
||||
|
@ -619,8 +647,12 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
|
|||
}
|
||||
|
||||
bucketSize := 1.0
|
||||
bucketStart := 0.0
|
||||
explicitBucketStart := false
|
||||
bucketStop := 0.0
|
||||
bucketStopIdx := -1
|
||||
|
||||
if len(v.Elems) == 2 {
|
||||
if len(v.Elems) > 1 {
|
||||
switch v.Elems[1].Type {
|
||||
case parser.ValueInt, parser.ValueFloat:
|
||||
var err error
|
||||
|
@ -631,8 +663,35 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
|
|||
default:
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[1].Name)
|
||||
}
|
||||
if len(v.Elems) > 2 {
|
||||
switch v.Elems[2].Type {
|
||||
case parser.ValueInt, parser.ValueFloat:
|
||||
var err error
|
||||
bucketStart, err = strconv.ParseFloat(v.Elems[2].Name, 64)
|
||||
explicitBucketStart = true
|
||||
if err != nil {
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[2].Name)
|
||||
}
|
||||
default:
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[2].Name)
|
||||
}
|
||||
if len(v.Elems) == 4 {
|
||||
switch v.Elems[3].Type {
|
||||
case parser.ValueInt, parser.ValueFloat:
|
||||
var err error
|
||||
bucketStop, err = strconv.ParseFloat(v.Elems[3].Name, 64)
|
||||
bucketStopIdx = int(math.Floor((bucketStop - bucketStart) / bucketSize))
|
||||
if err != nil {
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[3].Name)
|
||||
}
|
||||
default:
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[3].Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
columnNames := []string{"bucket_start", "count"}
|
||||
if v.Alias != "" {
|
||||
columnNames[0] = fmt.Sprintf("%s_bucket_start", v.Alias)
|
||||
|
@ -643,8 +702,11 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
|
|||
AbstractAggregator: AbstractAggregator{
|
||||
value: v.Elems[0],
|
||||
},
|
||||
bucketSize: bucketSize,
|
||||
columnNames: columnNames,
|
||||
bucketSize: bucketSize,
|
||||
bucketStart: bucketStart,
|
||||
explicitBucketStart: explicitBucketStart,
|
||||
bucketStopIdx: bucketStopIdx,
|
||||
columnNames: columnNames,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2320,3 +2320,96 @@ func (self *DataTestSuite) MeanAggregateFillWithZero(c *C) (Fun, Fun) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// issue #669
|
||||
func HistogramHelper(c *C, client Client, query string, expected map[float64]float64) {
|
||||
//Test basic histogram
|
||||
collection := client.RunQuery(query, c)
|
||||
|
||||
c.Assert(collection, HasLen, 1)
|
||||
maps := ToMap(collection[0])
|
||||
actual := make(map[float64]float64, len(maps))
|
||||
for key := range maps {
|
||||
c.Logf(fmt.Sprintf(`%d: bucket_start: %f count: %f`, key, maps[key]["bucket_start"], maps[key]["count"]))
|
||||
actual[maps[key]["bucket_start"].(float64)] = maps[key]["count"].(float64)
|
||||
}
|
||||
c.Assert(actual, HasLen, len(expected))
|
||||
for bucket, count := range expected {
|
||||
c.Assert(actual[bucket], Equals, count)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *DataTestSuite) Histogram(c *C) (Fun, Fun) {
|
||||
return func(client Client) {
|
||||
c.Logf("Running Histogram test")
|
||||
data := `[{"points": [[-3], [-2], [-1], [0], [1], [2], [3]], "name": "test_histogram", "columns": ["value"]}]`
|
||||
client.WriteJsonData(data, c)
|
||||
}, func(client Client) {
|
||||
//Test basic histogram
|
||||
expected := make(map[float64]float64, 7)
|
||||
expected[-3.0] = 1.0
|
||||
expected[-2.0] = 1.0
|
||||
expected[-1.0] = 1.0
|
||||
expected[0.0] = 1.0
|
||||
expected[1.0] = 1.0
|
||||
expected[2.0] = 1.0
|
||||
expected[3.0] = 1.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 1.0) from test_histogram", expected)
|
||||
|
||||
// Test specifying start and stop
|
||||
HistogramHelper(c, client, "select Histogram(value, 1.0, -3, 3) from test_histogram", expected)
|
||||
|
||||
// Test specifying start and stop outside domain of data
|
||||
expected = make(map[float64]float64, 21)
|
||||
expected[-10.0] = 0.0
|
||||
expected[-9.0] = 0.0
|
||||
expected[-8.0] = 0.0
|
||||
expected[-7.0] = 0.0
|
||||
expected[-6.0] = 0.0
|
||||
expected[-5.0] = 0.0
|
||||
expected[-4.0] = 0.0
|
||||
expected[-3.0] = 1.0
|
||||
expected[-2.0] = 1.0
|
||||
expected[-1.0] = 1.0
|
||||
expected[0.0] = 1.0
|
||||
expected[1.0] = 1.0
|
||||
expected[2.0] = 1.0
|
||||
expected[3.0] = 1.0
|
||||
expected[4.0] = 0.0
|
||||
expected[5.0] = 0.0
|
||||
expected[6.0] = 0.0
|
||||
expected[7.0] = 0.0
|
||||
expected[8.0] = 0.0
|
||||
expected[9.0] = 0.0
|
||||
expected[10.0] = 0.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 1.0, -10, 10) from test_histogram", expected)
|
||||
|
||||
// Test specifying start and stop inside domain of data
|
||||
expected = make(map[float64]float64, 2)
|
||||
expected[-1.0] = 1.0
|
||||
expected[0.0] = 1.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 1.0, -1, 0) from test_histogram", expected)
|
||||
|
||||
// Test specifying step and start that don't align with 0
|
||||
expected = make(map[float64]float64, 4)
|
||||
expected[-3.0] = 2.0
|
||||
expected[-1.0] = 2.0
|
||||
expected[1.0] = 2.0
|
||||
expected[3.0] = 1.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 2.0, -3) from test_histogram", expected)
|
||||
HistogramHelper(c, client, "select Histogram(value, 2.0, -3, 3) from test_histogram", expected)
|
||||
|
||||
// Test specifying step, start and stop that don't align with 0 inside the domain
|
||||
expected = make(map[float64]float64, 3)
|
||||
expected[-3.0] = 2.0
|
||||
expected[-1.0] = 2.0
|
||||
expected[1.0] = 2.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 2.0, -3, 1) from test_histogram", expected)
|
||||
|
||||
// Test specifying step and start that don't align with stop
|
||||
expected = make(map[float64]float64, 2)
|
||||
expected[-1.0] = 2.0
|
||||
expected[1.0] = 2.0
|
||||
HistogramHelper(c, client, "select Histogram(value, 2.0, -1, 2) from test_histogram", expected)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue