Implement sum().

pull/17/head
Todd Persen 2013-10-17 16:59:41 -04:00
parent 9df7ab5ef0
commit 4bd569f576
2 changed files with 124 additions and 0 deletions

View File

@ -29,10 +29,12 @@ func init() {
registeredAggregators["count"] = NewCountAggregator
registeredAggregators["max"] = NewMaxAggregator
registeredAggregators["min"] = NewMinAggregator
registeredAggregators["sum"] = NewSumAggregator
registeredAggregators["percentile"] = NewPercentileAggregator
registeredAggregators["median"] = NewMedianAggregator
registeredAggregators["mean"] = NewMeanAggregator
registeredAggregators["mode"] = NewModeAggregator
/* registeredAggregators["distinct"] = NewDistinctAggregator */
registeredAggregators["__timestamp_aggregator"] = NewTimestampAggregator
}
@ -759,3 +761,92 @@ func NewMinAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)
values: make(map[string]map[interface{}]protocol.FieldValue),
}, nil
}
//
// Sum Aggregator
//
type SumAggregator struct {
fieldName string
fieldIndex int
fieldType protocol.FieldDefinition_Type
sums map[string]map[interface{}]float64
}
func (self *SumAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
sums := self.sums[series]
if sums == nil {
sums = make(map[interface{}]float64)
self.sums[series] = sums
}
currentValue := sums[group]
switch self.fieldType {
case protocol.FieldDefinition_INT64:
currentValue += float64(*p.Values[self.fieldIndex].Int64Value)
case protocol.FieldDefinition_INT32:
currentValue += float64(*p.Values[self.fieldIndex].IntValue)
case protocol.FieldDefinition_DOUBLE:
currentValue += *p.Values[self.fieldIndex].DoubleValue
}
sums[group] = currentValue
return nil
}
func (self *SumAggregator) ColumnName() string {
return "sum"
}
func (self *SumAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *SumAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value := int32(self.sums[series][group])
return &protocol.FieldValue{IntValue: &value}
case protocol.FieldDefinition_INT64:
value := int64(self.sums[series][group])
return &protocol.FieldValue{Int64Value: &value}
case protocol.FieldDefinition_DOUBLE:
value := float64(self.sums[series][group])
return &protocol.FieldValue{DoubleValue: &value}
}
return &protocol.FieldValue{}
}
func (self *SumAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if *field.Name == self.fieldName {
self.fieldIndex = idx
self.fieldType = *field.Type
switch self.fieldType {
case protocol.FieldDefinition_INT32,
protocol.FieldDefinition_INT64,
protocol.FieldDefinition_DOUBLE:
// that's fine
default:
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Field %s has invalid type %v", self.fieldName, self.fieldType))
}
return nil
}
}
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}
func NewSumAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
if len(value.Elems) != 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function max() requires only one argument")
}
return &SumAggregator{
fieldName: value.Elems[0].Name,
sums: make(map[string]map[interface{}]float64),
}, nil
}

View File

@ -829,6 +829,38 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) {
]`)
}
func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "column_one" }
]
}
]`)
runQuery(engine, "select sum(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int_value": 11 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int_value": 16 }], "timestamp": 1381346760, "sequence_number": 1 }
],
"name": "foo",
"fields": [
{ "type": "INT32", "name": "sum" }
]
}
]`)
}
func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
@ -846,6 +878,7 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
{ "values": [{ "int_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
],
"name": "foo",