Update aggregators to support returning of multi-value results.

pull/17/head
Todd Persen 2013-10-17 19:21:48 -04:00
parent 9254aa8b54
commit 27b62caa29
2 changed files with 75 additions and 42 deletions

View File

@ -16,7 +16,7 @@ type PointSlice []protocol.Point
type Aggregator interface {
AggregatePoint(series string, group interface{}, p *protocol.Point) error
InitializeFieldsMetadata(series *protocol.Series) error
GetValue(series string, group interface{}) *protocol.FieldValue
GetValue(series string, group interface{}) []*protocol.FieldValue
ColumnName() string
ColumnType() protocol.FieldDefinition_Type
}
@ -64,9 +64,12 @@ func (self *CountAggregator) ColumnType() protocol.FieldDefinition_Type {
return protocol.FieldDefinition_INT32
}
func (self *CountAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *CountAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
value := self.counts[series][group]
return &protocol.FieldValue{IntValue: &value}
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &value})
return returnValues
}
func (self *CountAggregator) InitializeFieldsMetadata(series *protocol.Series) error { return nil }
@ -106,9 +109,12 @@ func (self *TimestampAggregator) ColumnType() protocol.FieldDefinition_Type {
return protocol.FieldDefinition_INT32
}
func (self *TimestampAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *TimestampAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
value := self.timestamps[series][group]
return &protocol.FieldValue{Int64Value: &value}
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value})
return returnValues
}
func (self *TimestampAggregator) InitializeFieldsMetadata(series *protocol.Series) error { return nil }
@ -177,9 +183,12 @@ func (self *MeanAggregator) ColumnType() protocol.FieldDefinition_Type {
return protocol.FieldDefinition_DOUBLE
}
func (self *MeanAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *MeanAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
mean := self.means[series][group]
return &protocol.FieldValue{DoubleValue: &mean}
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &mean})
return returnValues
}
func (self *MeanAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -287,28 +296,30 @@ func (self *MedianAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *MedianAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *MedianAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
sort.Ints(self.int_values[series][group])
length := len(self.int_values[series][group])
index := int(math.Floor(float64(length)*0.5+0.5)) - 1
point := int32(self.int_values[series][group][index])
return &protocol.FieldValue{IntValue: &point}
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &point})
case protocol.FieldDefinition_INT64:
sort.Ints(self.int_values[series][group])
length := len(self.int_values[series][group])
index := int(math.Floor(float64(length)*0.5+0.5)) - 1
point := int64(self.int_values[series][group][index])
return &protocol.FieldValue{Int64Value: &point}
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &point})
case protocol.FieldDefinition_DOUBLE:
sort.Float64s(self.float_values[series][group])
length := len(self.float_values[series][group])
index := int(math.Floor(float64(length)*0.5+0.5)) - 1
point := self.float_values[series][group][index]
return &protocol.FieldValue{DoubleValue: &point}
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &point})
}
return &protocol.FieldValue{}
return returnValues
}
func (self *MedianAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -417,28 +428,31 @@ func (self *PercentileAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *PercentileAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *PercentileAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
sort.Ints(self.int_values[series][group])
length := len(self.int_values[series][group])
index := int(math.Floor(float64(length)*self.percentile/100.0+0.5)) - 1
point := int32(self.int_values[series][group][index])
return &protocol.FieldValue{IntValue: &point}
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &point})
case protocol.FieldDefinition_INT64:
sort.Ints(self.int_values[series][group])
length := len(self.int_values[series][group])
index := int(math.Floor(float64(length)*self.percentile/100.0+0.5)) - 1
point := int64(self.int_values[series][group][index])
return &protocol.FieldValue{Int64Value: &point}
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &point})
case protocol.FieldDefinition_DOUBLE:
sort.Float64s(self.float_values[series][group])
length := len(self.float_values[series][group])
index := int(math.Floor(float64(length)*self.percentile/100.0+0.5)) - 1
point := self.float_values[series][group][index]
return &protocol.FieldValue{DoubleValue: &point}
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &point})
}
return &protocol.FieldValue{}
return returnValues
}
func (self *PercentileAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -530,7 +544,9 @@ func (self *ModeAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *ModeAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *ModeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
values := make([]interface{}, 0)
currentCount := 1
@ -544,19 +560,21 @@ func (self *ModeAggregator) GetValue(series string, group interface{}) *protocol
}
}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value := values[0].(int32)
return &protocol.FieldValue{IntValue: &value}
case protocol.FieldDefinition_INT64:
value := values[0].(int64)
return &protocol.FieldValue{Int64Value: &value}
case protocol.FieldDefinition_DOUBLE:
value := values[0].(float64)
return &protocol.FieldValue{DoubleValue: &value}
for _, value := range values {
switch self.fieldType {
case protocol.FieldDefinition_INT32:
v := value.(int32)
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &v})
case protocol.FieldDefinition_INT64:
v := value.(int64)
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &v})
case protocol.FieldDefinition_DOUBLE:
v := value.(float64)
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v})
}
}
return &protocol.FieldValue{}
return returnValues
}
func (self *ModeAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -639,9 +657,11 @@ func (self *MaxAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *MaxAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *MaxAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
value := self.values[series][group]
return &value
returnValues = append(returnValues, &value)
return returnValues
}
func (self *MaxAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -724,9 +744,11 @@ func (self *MinAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *MinAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *MinAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
value := self.values[series][group]
return &value
returnValues = append(returnValues, &value)
return returnValues
}
func (self *MinAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -803,19 +825,22 @@ func (self *SumAggregator) ColumnType() protocol.FieldDefinition_Type {
return self.fieldType
}
func (self *SumAggregator) GetValue(series string, group interface{}) *protocol.FieldValue {
func (self *SumAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
switch self.fieldType {
case protocol.FieldDefinition_INT32:
value := int32(self.sums[series][group])
return &protocol.FieldValue{IntValue: &value}
returnValues = append(returnValues, &protocol.FieldValue{IntValue: &value})
case protocol.FieldDefinition_INT64:
value := int64(self.sums[series][group])
return &protocol.FieldValue{Int64Value: &value}
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value})
case protocol.FieldDefinition_DOUBLE:
value := float64(self.sums[series][group])
return &protocol.FieldValue{DoubleValue: &value}
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &value})
}
return &protocol.FieldValue{}
return returnValues
}
func (self *SumAggregator) InitializeFieldsMetadata(series *protocol.Series) error {

View File

@ -260,8 +260,6 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(database string, query *pa
}
var sequenceNumber uint32 = 1
/* fields := []*protocol.FieldDefinition{} */
fields := []*protocol.FieldDefinition{}
for _, aggregator := range aggregators {
@ -283,7 +281,8 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(database string, query *pa
tempTable := table
points := []*protocol.Point{}
for groupId, _ := range tableGroups {
timestamp := *timestampAggregator.GetValue(table, groupId).Int64Value
timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
@ -291,7 +290,16 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(database string, query *pa
}
for _, aggregator := range aggregators {
point.Values = append(point.Values, aggregator.GetValue(table, groupId))
// point.Values = append(point.Values, aggregator.GetValue(table, groupId)[0])
returnValues := aggregator.GetValue(table, groupId)
returnDepth := len(returnValues)
for _, value := range returnValues {
if returnDepth > 1 {
// do some crazy shit
} else {
point.Values = append(point.Values, value)
}
}
}
for idx, _ := range groupBy {