Add support for distinct()

pull/17/head
Todd Persen 2013-10-18 12:56:31 -04:00
parent 930ecf9448
commit d22654bf8d
1 changed files with 99 additions and 1 deletions

View File

@ -34,7 +34,7 @@ func init() {
registeredAggregators["median"] = NewMedianAggregator
registeredAggregators["mean"] = NewMeanAggregator
registeredAggregators["mode"] = NewModeAggregator
/* registeredAggregators["distinct"] = NewDistinctAggregator */
registeredAggregators["distinct"] = NewDistinctAggregator
registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator
}
@ -610,6 +610,104 @@ func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)
}, nil
}
//
// Distinct Aggregator
//
type DistinctAggregator struct {
fieldName string
fieldIndex int
fieldType protocol.FieldDefinition_Type
counts map[string]map[interface{}]map[interface{}]int
}
func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
seriesCounts := self.counts[series]
if seriesCounts == nil {
seriesCounts = make(map[interface{}]map[interface{}]int)
self.counts[series] = seriesCounts
}
groupCounts := seriesCounts[group]
if groupCounts == nil {
groupCounts = make(map[interface{}]int)
}
var value interface{}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value = *p.Values[self.fieldIndex].IntValue
case protocol.FieldDefinition_INT64:
value = *p.Values[self.fieldIndex].Int64Value
case protocol.FieldDefinition_DOUBLE:
value = *p.Values[self.fieldIndex].DoubleValue
}
count := groupCounts[value]
count += 1
groupCounts[value] = count
seriesCounts[group] = groupCounts
return nil
}
func (self *DistinctAggregator) ColumnName() string {
return "distinct"
}
func (self *DistinctAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *DistinctAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
for value, _ := range self.counts[series][group] {
switch self.fieldType {
case protocol.FieldDefinition_INT32:
v := value.(int32)
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &v})
case protocol.FieldDefinition_INT64:
v := value.(int64)
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &v})
case protocol.FieldDefinition_DOUBLE:
v := value.(float64)
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v})
}
}
return returnValues
}
func (self *DistinctAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if *field.Name == self.fieldName {
self.fieldIndex = idx
self.fieldType = *field.Type
switch self.fieldType {
case protocol.FieldDefinition_INT32,
protocol.FieldDefinition_INT64,
protocol.FieldDefinition_DOUBLE:
// that's fine
default:
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
}
return nil
}
}
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}
func NewDistinctAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
return &DistinctAggregator{
fieldName: value.Elems[0].Name,
counts: make(map[string]map[interface{}]map[interface{}]int),
}, nil
}
//
// Max Aggregator
//