implement mode() for unimodal datasets

pull/17/head
Todd Persen 2013-10-16 13:34:58 -04:00
parent 19a0e52044
commit 37c39d5fef
2 changed files with 152 additions and 0 deletions

View File

@ -32,6 +32,7 @@ func init() {
registeredAggregators["percentile"] = NewPercentileAggregator
registeredAggregators["median"] = NewMedianAggregator
registeredAggregators["mean"] = NewMeanAggregator
registeredAggregators["mode"] = NewModeAggregator
registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator
}
@ -478,6 +479,117 @@ func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator,
}, nil
}
//
// Mode Aggregator
//
type ModeAggregator struct {
fieldName string
fieldIndex int
fieldType protocol.FieldDefinition_Type
counts map[string]map[interface{}]map[interface{}]int
}
func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
seriesCounts := self.counts[series]
if seriesCounts == nil {
seriesCounts = make(map[interface{}]map[interface{}]int)
self.counts[series] = seriesCounts
}
groupCounts := seriesCounts[group]
if groupCounts == nil {
groupCounts = make(map[interface{}]int)
}
var value interface{}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value = *p.Values[self.fieldIndex].IntValue
case protocol.FieldDefinition_INT64:
value = *p.Values[self.fieldIndex].Int64Value
case protocol.FieldDefinition_DOUBLE:
value = *p.Values[self.fieldIndex].DoubleValue
}
count := groupCounts[value]
count += 1
groupCounts[value] = count
seriesCounts[group] = groupCounts
return nil
}
func (self *ModeAggregator) ColumnName() string {
return "mode"
}
func (self *ModeAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *ModeAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
values := make([]interface{}, 0)
currentCount := 1
for value, count := range self.counts[series][group] {
if count == currentCount {
values = append(values, value)
} else if count > currentCount {
values = make([]interface{}, 0)
values = append(values, value)
currentCount = count
}
}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value := values[0].(int32)
return &protocol.FieldValue{IntValue: &value}
case protocol.FieldDefinition_INT64:
value := values[0].(int64)
return &protocol.FieldValue{Int64Value: &value}
case protocol.FieldDefinition_DOUBLE:
value := values[0].(float64)
return &protocol.FieldValue{DoubleValue: &value}
}
return &protocol.FieldValue{}
}
func (self *ModeAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if *field.Name == self.fieldName {
self.fieldIndex = idx
self.fieldType = *field.Type
switch self.fieldType {
case protocol.FieldDefinition_INT32,
protocol.FieldDefinition_INT64,
protocol.FieldDefinition_DOUBLE:
// that's fine
default:
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
}
return nil
}
}
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}
func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
if len(value.Elems) != 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function mode() requires exactly one argument")
}
return &ModeAggregator{
fieldName: value.Elems[0].Name,
counts: make(map[string]map[interface{}]map[interface{}]int),
}, nil
}
//
// Max Aggregator
//

View File

@ -840,6 +840,46 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) {
]`)
}
func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "column_one" }
]
}
]`)
runQuery(engine, "select mode(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int_value": 1 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346760, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "mode" }
]
}
]`)
}
func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) {
err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
engine := createEngine(c, `[]`)