From 4bd569f576f37f4282f04d483c3e72b0ca4f13d8 Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Thu, 17 Oct 2013 16:59:41 -0400 Subject: [PATCH] Implement sum(). --- src/engine/aggregator.go | 91 +++++++++++++++++++++++++++++++++++++++ src/engine/engine_test.go | 33 ++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index efe5d13de4..fa45c990ec 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -29,10 +29,12 @@ func init() { registeredAggregators["count"] = NewCountAggregator registeredAggregators["max"] = NewMaxAggregator registeredAggregators["min"] = NewMinAggregator + registeredAggregators["sum"] = NewSumAggregator registeredAggregators["percentile"] = NewPercentileAggregator registeredAggregators["median"] = NewMedianAggregator registeredAggregators["mean"] = NewMeanAggregator registeredAggregators["mode"] = NewModeAggregator + /* registeredAggregators["distinct"] = NewDistinctAggregator */ registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator } @@ -759,3 +761,92 @@ func NewMinAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) values: make(map[string]map[interface{}]protocol.FieldValue), }, nil } + +// +// Sum Aggregator +// + +type SumAggregator struct { + fieldName string + fieldIndex int + fieldType protocol.FieldDefinition_Type + sums map[string]map[interface{}]float64 +} + +func (self *SumAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + sums := self.sums[series] + if sums == nil { + sums = make(map[interface{}]float64) + self.sums[series] = sums + } + + currentValue := sums[group] + + switch self.fieldType { + case protocol.FieldDefinition_INT64: + currentValue += float64(*p.Values[self.fieldIndex].Int64Value) + case protocol.FieldDefinition_INT32: + currentValue += float64(*p.Values[self.fieldIndex].IntValue) + case protocol.FieldDefinition_DOUBLE: + currentValue += *p.Values[self.fieldIndex].DoubleValue + } + + sums[group] = currentValue + return nil +} + +func (self *SumAggregator) ColumnName() string { + return "sum" +} + +func (self *SumAggregator) ColumnType() protocol.FieldDefinition_Type { + return self.fieldType +} + +func (self *SumAggregator) GetValue(series string, group interface{}) *protocol.FieldValue { + switch self.fieldType { + case protocol.FieldDefinition_INT32: + value := int32(self.sums[series][group]) + return &protocol.FieldValue{IntValue: &value} + case protocol.FieldDefinition_INT64: + value := int64(self.sums[series][group]) + return &protocol.FieldValue{Int64Value: &value} + case protocol.FieldDefinition_DOUBLE: + value := float64(self.sums[series][group]) + return &protocol.FieldValue{DoubleValue: &value} + } + return &protocol.FieldValue{} +} + +func (self *SumAggregator) InitializeFieldsMetadata(series *protocol.Series) error { + for idx, field := range series.Fields { + if *field.Name == self.fieldName { + self.fieldIndex = idx + self.fieldType = *field.Type + + switch self.fieldType { + case protocol.FieldDefinition_INT32, + protocol.FieldDefinition_INT64, + protocol.FieldDefinition_DOUBLE: + // that's fine + default: + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType)) + } + + return nil + } + } + + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName)) +} + +func NewSumAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) { + if len(value.Elems) != 1 { + return nil, common.NewQueryError(common.WrongNumberOfArguments, "function max() requires only one argument") + } + + return &SumAggregator{ + fieldName: value.Elems[0].Name, + sums: make(map[string]map[interface{}]float64), + }, nil +} diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index 6d475f994d..9b91931705 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -829,6 +829,38 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) { ]`) } +func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) { + engine := createEngine(c, `[ + { + "points": [ + { "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, + { "values": [{ "int_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + ], + "name": "foo", + "fields": [ + { "type": "INT32", "name": "column_one" } + ] + } + ]`) + + runQuery(engine, "select sum(column_one) from foo group by time(1m);", c, `[ + { + "points": [ + { "values": [{ "int_value": 11 }], "timestamp": 1381346700, "sequence_number": 1 }, + { "values": [{ "int_value": 16 }], "timestamp": 1381346760, "sequence_number": 1 } + ], + "name": "foo", + "fields": [ + { "type": "INT32", "name": "sum" } + ] + } + ]`) +} + func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { @@ -846,6 +878,7 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) { { "values": [{ "int_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, { "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, { "values": [{ "int_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, + { "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }, { "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } ], "name": "foo",