diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 9f2e197558..efe5d13de4 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -32,6 +32,7 @@ func init() { registeredAggregators["percentile"] = NewPercentileAggregator registeredAggregators["median"] = NewMedianAggregator registeredAggregators["mean"] = NewMeanAggregator + registeredAggregators["mode"] = NewModeAggregator registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator } @@ -478,6 +479,117 @@ func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator, }, nil } +// +// Mode Aggregator +// + +type ModeAggregator struct { + fieldName string + fieldIndex int + fieldType protocol.FieldDefinition_Type + counts map[string]map[interface{}]map[interface{}]int +} + +func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + seriesCounts := self.counts[series] + if seriesCounts == nil { + seriesCounts = make(map[interface{}]map[interface{}]int) + self.counts[series] = seriesCounts + } + + groupCounts := seriesCounts[group] + if groupCounts == nil { + groupCounts = make(map[interface{}]int) + } + + var value interface{} + switch self.fieldType { + case protocol.FieldDefinition_INT32: + value = *p.Values[self.fieldIndex].IntValue + case protocol.FieldDefinition_INT64: + value = *p.Values[self.fieldIndex].Int64Value + case protocol.FieldDefinition_DOUBLE: + value = *p.Values[self.fieldIndex].DoubleValue + } + + count := groupCounts[value] + count += 1 + groupCounts[value] = count + seriesCounts[group] = groupCounts + + return nil +} + +func (self *ModeAggregator) ColumnName() string { + return "mode" +} + +func (self *ModeAggregator) ColumnType() protocol.FieldDefinition_Type { + return self.fieldType +} + +func (self *ModeAggregator) GetValue(series string, group interface{}) *protocol.FieldValue { + values := make([]interface{}, 0) + currentCount := 1 + + for value, count := range self.counts[series][group] { + if count == currentCount { + values = append(values, value) + } else if count > currentCount { + values = make([]interface{}, 0) + values = append(values, value) + currentCount = count + } + } + + switch self.fieldType { + case protocol.FieldDefinition_INT32: + value := values[0].(int32) + return &protocol.FieldValue{IntValue: &value} + case protocol.FieldDefinition_INT64: + value := values[0].(int64) + return &protocol.FieldValue{Int64Value: &value} + case protocol.FieldDefinition_DOUBLE: + value := values[0].(float64) + return &protocol.FieldValue{DoubleValue: &value} + } + + return &protocol.FieldValue{} +} + +func (self *ModeAggregator) InitializeFieldsMetadata(series *protocol.Series) error { + for idx, field := range series.Fields { + if *field.Name == self.fieldName { + self.fieldIndex = idx + self.fieldType = *field.Type + + switch self.fieldType { + case protocol.FieldDefinition_INT32, + protocol.FieldDefinition_INT64, + protocol.FieldDefinition_DOUBLE: + // that's fine + default: + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType)) + } + + return nil + } + } + + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName)) +} + +func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) { + if len(value.Elems) != 1 { + return nil, common.NewQueryError(common.WrongNumberOfArguments, "function mode() requires exactly one argument") + } + + return &ModeAggregator{ + fieldName: value.Elems[0].Name, + counts: make(map[string]map[interface{}]map[interface{}]int), + }, nil +} + // // Max Aggregator // diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index 20bc9252fc..daed363479 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -840,6 +840,46 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) { ]`) } +func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) { + engine := createEngine(c, `[ + { + "points": [ + { "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + ], + "name": "foo", + "fields": [ + { "type": "INT32", "name": "column_one" } + ] + } + ]`) + + runQuery(engine, "select mode(column_one) from foo group by time(1m);", c, `[ + { + "points": [ + { "values": [{ "int_value": 1 }], "timestamp": 1381346700, "sequence_number": 1 }, + { "values": [{ "int_value": 3 }], "timestamp": 1381346760, "sequence_number": 1 } + ], + "name": "foo", + "fields": [ + { "type": "INT32", "name": "mode" } + ] + } + ]`) +} + func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) { err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument") engine := createEngine(c, `[]`)