combine median and percentile.
parent
cac571d4fb
commit
6c30299e36
|
@ -221,112 +221,15 @@ func NewMeanAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)
|
|||
counts: make(map[string]map[interface{}]int),
|
||||
}, nil
|
||||
}
|
||||
|
||||
//
|
||||
// Median Aggregator
|
||||
//
|
||||
|
||||
type MedianAggregator struct {
|
||||
fieldName string
|
||||
fieldIndex int
|
||||
fieldType protocol.FieldDefinition_Type
|
||||
int_values map[string]map[interface{}][]int64
|
||||
float_values map[string]map[interface{}][]float64
|
||||
}
|
||||
|
||||
func (self *MedianAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
|
||||
switch self.fieldType {
|
||||
case protocol.FieldDefinition_INT64:
|
||||
int_values := self.int_values[series]
|
||||
if int_values == nil {
|
||||
int_values = make(map[interface{}][]int64)
|
||||
self.int_values[series] = int_values
|
||||
}
|
||||
|
||||
points := int_values[group]
|
||||
if points == nil {
|
||||
points = []int64{}
|
||||
}
|
||||
|
||||
points = append(points, *p.Values[self.fieldIndex].Int64Value)
|
||||
int_values[group] = points
|
||||
case protocol.FieldDefinition_DOUBLE:
|
||||
float_values := self.float_values[series]
|
||||
if float_values == nil {
|
||||
float_values = make(map[interface{}][]float64)
|
||||
self.float_values[series] = float_values
|
||||
}
|
||||
|
||||
points := float_values[group]
|
||||
if points == nil {
|
||||
points = make([]float64, 0)
|
||||
}
|
||||
|
||||
points = append(points, *p.Values[self.fieldIndex].DoubleValue)
|
||||
float_values[group] = points
|
||||
default:
|
||||
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *MedianAggregator) ColumnName() string {
|
||||
return "median"
|
||||
}
|
||||
|
||||
func (self *MedianAggregator) ColumnType() protocol.FieldDefinition_Type {
|
||||
return self.fieldType
|
||||
}
|
||||
|
||||
func (self *MedianAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
|
||||
returnValues := []*protocol.FieldValue{}
|
||||
|
||||
switch self.fieldType {
|
||||
case protocol.FieldDefinition_INT64:
|
||||
SortInt64(self.int_values[series][group])
|
||||
length := len(self.int_values[series][group])
|
||||
index := int(math.Floor(float64(length)*0.5+0.5)) - 1
|
||||
point := int64(self.int_values[series][group][index])
|
||||
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &point})
|
||||
case protocol.FieldDefinition_DOUBLE:
|
||||
sort.Float64s(self.float_values[series][group])
|
||||
length := len(self.float_values[series][group])
|
||||
index := int(math.Floor(float64(length)*0.5+0.5)) - 1
|
||||
point := self.float_values[series][group][index]
|
||||
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &point})
|
||||
}
|
||||
return returnValues
|
||||
}
|
||||
|
||||
func (self *MedianAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
|
||||
for idx, field := range series.Fields {
|
||||
if *field.Name == self.fieldName {
|
||||
self.fieldIndex = idx
|
||||
self.fieldType = *field.Type
|
||||
|
||||
switch self.fieldType {
|
||||
case protocol.FieldDefinition_INT64,
|
||||
protocol.FieldDefinition_DOUBLE:
|
||||
// that's fine
|
||||
default:
|
||||
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
|
||||
}
|
||||
|
||||
func NewMedianAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
|
||||
if len(value.Elems) != 1 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function median() requires exactly one argument")
|
||||
}
|
||||
|
||||
return &MedianAggregator{
|
||||
return &PercentileAggregator{
|
||||
functionName: "median",
|
||||
fieldName: value.Elems[0].Name,
|
||||
percentile: 50.0,
|
||||
int_values: make(map[string]map[interface{}][]int64),
|
||||
float_values: make(map[string]map[interface{}][]float64),
|
||||
}, nil
|
||||
|
@ -337,6 +240,7 @@ func NewMedianAggregator(_ *parser.Query, value *parser.Value) (Aggregator, erro
|
|||
//
|
||||
|
||||
type PercentileAggregator struct {
|
||||
functionName string
|
||||
fieldName string
|
||||
fieldIndex int
|
||||
fieldType protocol.FieldDefinition_Type
|
||||
|
@ -383,7 +287,7 @@ func (self *PercentileAggregator) AggregatePoint(series string, group interface{
|
|||
}
|
||||
|
||||
func (self *PercentileAggregator) ColumnName() string {
|
||||
return "percentile"
|
||||
return self.functionName
|
||||
}
|
||||
|
||||
func (self *PercentileAggregator) ColumnType() protocol.FieldDefinition_Type {
|
||||
|
@ -443,6 +347,7 @@ func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator,
|
|||
}
|
||||
|
||||
return &PercentileAggregator{
|
||||
functionName: "percentile",
|
||||
fieldName: value.Elems[0].Name,
|
||||
percentile: percentile,
|
||||
int_values: make(map[string]map[interface{}][]int64),
|
||||
|
|
Loading…
Reference in New Issue