From d22654bf8d7ea5aa74051b21d8390e66d29020fd Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Fri, 18 Oct 2013 12:56:31 -0400 Subject: [PATCH] Add support for distinct() --- src/engine/aggregator.go | 100 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 9ff739fafe..76a4a11f91 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -34,7 +34,7 @@ func init() { registeredAggregators["median"] = NewMedianAggregator registeredAggregators["mean"] = NewMeanAggregator registeredAggregators["mode"] = NewModeAggregator - /* registeredAggregators["distinct"] = NewDistinctAggregator */ + registeredAggregators["distinct"] = NewDistinctAggregator registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator } @@ -610,6 +610,104 @@ func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) }, nil } +// +// Distinct Aggregator +// + +type DistinctAggregator struct { + fieldName string + fieldIndex int + fieldType protocol.FieldDefinition_Type + counts map[string]map[interface{}]map[interface{}]int +} + +func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + seriesCounts := self.counts[series] + if seriesCounts == nil { + seriesCounts = make(map[interface{}]map[interface{}]int) + self.counts[series] = seriesCounts + } + + groupCounts := seriesCounts[group] + if groupCounts == nil { + groupCounts = make(map[interface{}]int) + } + + var value interface{} + switch self.fieldType { + case protocol.FieldDefinition_INT32: + value = *p.Values[self.fieldIndex].IntValue + case protocol.FieldDefinition_INT64: + value = *p.Values[self.fieldIndex].Int64Value + case protocol.FieldDefinition_DOUBLE: + value = *p.Values[self.fieldIndex].DoubleValue + } + + count := groupCounts[value] + count += 1 + groupCounts[value] = count + seriesCounts[group] = groupCounts + + return nil +} + +func (self *DistinctAggregator) ColumnName() string { + return "distinct" +} + +func (self *DistinctAggregator) ColumnType() protocol.FieldDefinition_Type { + return self.fieldType +} + +func (self *DistinctAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { + returnValues := []*protocol.FieldValue{} + + for value, _ := range self.counts[series][group] { + switch self.fieldType { + case protocol.FieldDefinition_INT32: + v := value.(int32) + returnValues = append(returnValues, &protocol.FieldValue{IntValue: &v}) + case protocol.FieldDefinition_INT64: + v := value.(int64) + returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &v}) + case protocol.FieldDefinition_DOUBLE: + v := value.(float64) + returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v}) + } + } + + return returnValues +} + +func (self *DistinctAggregator) InitializeFieldsMetadata(series *protocol.Series) error { + for idx, field := range series.Fields { + if *field.Name == self.fieldName { + self.fieldIndex = idx + self.fieldType = *field.Type + + switch self.fieldType { + case protocol.FieldDefinition_INT32, + protocol.FieldDefinition_INT64, + protocol.FieldDefinition_DOUBLE: + // that's fine + default: + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType)) + } + + return nil + } + } + + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName)) +} + +func NewDistinctAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) { + return &DistinctAggregator{ + fieldName: value.Elems[0].Name, + counts: make(map[string]map[interface{}]map[interface{}]int), + }, nil +} + // // Max Aggregator //