Basic percentile calculations work.

pull/17/head
Todd Persen 2013-10-15 15:31:23 -04:00
parent 9e7ca7dcde
commit a8d922680c
2 changed files with 140 additions and 37 deletions

View File

@ -3,8 +3,10 @@ package engine
import (
"common"
"fmt"
"math"
"parser"
"protocol"
"strconv"
"time"
)
@ -28,6 +30,10 @@ func init() {
registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator
}
//
// Count Aggregator
//
type CountAggregator struct {
counts map[string]map[interface{}]int32
}
@ -61,43 +67,9 @@ func NewCountAggregator(*parser.Query, *parser.Value) (Aggregator, error) {
return &CountAggregator{make(map[string]map[interface{}]int32)}, nil
}
type PercentileAggregator struct {
percentiles map[string]map[interface{}]int32
}
func (self *PercentileAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
percentiles := self.percentiles[series]
if percentiles == nil {
percentiles = make(map[interface{}]int32)
self.percentiles[series] = percentiles
}
percentiles[group]++
return nil
}
func (self *PercentileAggregator) ColumnName() string {
return "percentile"
}
func (self *PercentileAggregator) ColumnType() protocol.FieldDefinition_Type {
return protocol.FieldDefinition_DOUBLE
}
func (self *PercentileAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
value := self.percentiles[series][group]
return &protocol.FieldValue{IntValue: &value}
}
func (self *PercentileAggregator) InitializeFieldsMetadata(series *protocol.Series) error { return nil }
func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
if len(value.Elems) != 2 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function percentile(...) requires exactly two arguments")
}
fmt.Printf("CREATING NEW PercentileAggregator")
return &PercentileAggregator{make(map[string]map[interface{}]int32)}, nil
}
//
// Timestamp Aggregator
//
type TimestampAggregator struct {
duration *time.Duration
@ -145,6 +117,97 @@ func NewTimestampAggregator(query *parser.Query, _ *parser.Value) (Aggregator, e
}, nil
}
//
// Percentile Aggregator
//
type PercentileAggregator struct {
fieldName string
fieldIndex int
fieldType protocol.FieldDefinition_Type
percentile int
values map[string]map[interface{}][]protocol.Point
}
func (self *PercentileAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
values := self.values[series]
if values == nil {
values = make(map[interface{}][]protocol.Point)
self.values[series] = values
}
points := values[group]
if points == nil {
points = make([]protocol.Point, 0)
}
points = append(points, *p)
values[group] = points
fmt.Println(values[group])
return nil
}
func (self *PercentileAggregator) ColumnName() string {
return "percentile"
}
func (self *PercentileAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *PercentileAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
length := len(self.values[series][group])
index := int(math.Floor(float64(length)*float64(self.percentile)/100.0+0.5)) - 1
point := self.values[series][group][index]
return point.Values[0]
}
func (self *PercentileAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if *field.Name == self.fieldName {
self.fieldIndex = idx
self.fieldType = *field.Type
switch self.fieldType {
case protocol.FieldDefinition_INT32,
protocol.FieldDefinition_INT64,
protocol.FieldDefinition_DOUBLE:
// that's fine
default:
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
}
return nil
}
}
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}
func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
if len(value.Elems) != 2 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function percentile(...) requires exactly two arguments")
}
percentile, err := strconv.Atoi(value.Elems[1].Name)
if err != nil {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "not an integer")
}
return &PercentileAggregator{
fieldName: value.Elems[0].Name,
percentile: percentile,
values: make(map[string]map[interface{}][]protocol.Point),
}, nil
}
//
// Max Aggregator
//
type MaxAggregator struct {
fieldName string
fieldIndex int
@ -226,6 +289,10 @@ func NewMaxAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)
}, nil
}
//
// Min Aggregator
//
type MinAggregator struct {
fieldName string
fieldIndex int

View File

@ -800,6 +800,42 @@ func (self *EngineSuite) TestMaxMinQueryWithGroupByTime(c *C) {
`)
}
func (self *EngineSuite) TestPercentileQueryWithGroupByTime(c *C) {
// make the mock coordinator return some data
engine := createEngine(c, `
[
{
"points": [
{ "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "column_one" }
]
}
]
`)
runQuery(engine, "select percentile(column_one, 80) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int_value": 6 }], "timestamp": 1381346700, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "percentile" }
]
}
]
`)
}
func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) {
err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
engine := createEngine(c, `[]`)