From c3807b7f85ff5789dc8b44a755a419d3dc4fbb49 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Tue, 19 Nov 2013 16:13:23 -0500 Subject: [PATCH] fix #59. Add histogram aggregate function --- CHANGELOG.md | 1 + src/engine/aggregator.go | 234 ++++++++++++++++++++++++++++---------- src/engine/engine.go | 22 ++-- src/engine/engine_test.go | 197 ++++++++++++++++++++++++++++++++ 4 files changed, 385 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 224ecf1d52..cd8a79d0f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -119,6 +119,7 @@ - [Issue #35](https://github.com/influxdb/influxdb/issues/35). Support table aliases in Join Queries - [Issue #71](https://github.com/influxdb/influxdb/issues/71). Add WillReturnSingleSeries to the Query - [Issue #61](https://github.com/influxdb/influxdb/issues/61). Limit should default to 10k +- [Issue #59](https://github.com/influxdb/influxdb/issues/59). Add histogram aggregate function ## Bugfixes diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index fcf5c02575..7b110be98f 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -16,8 +16,8 @@ type PointSlice []protocol.Point type Aggregator interface { AggregatePoint(series string, group interface{}, p *protocol.Point) error InitializeFieldsMetadata(series *protocol.Series) error - GetValue(series string, group interface{}) []*protocol.FieldValue - ColumnName() string + GetValues(series string, group interface{}) [][]*protocol.FieldValue + ColumnNames() []string } type AggregatorInitializer func(*parser.Query, *parser.Value) (Aggregator, error) @@ -26,6 +26,7 @@ var registeredAggregators = make(map[string]AggregatorInitializer) func init() { registeredAggregators["count"] = NewCountAggregator + registeredAggregators["histogram"] = NewHistogramAggregator registeredAggregators["derivative"] = NewDerivativeAggregator registeredAggregators["stddev"] = NewStandardDeviationAggregator registeredAggregators["max"] = NewMaxAggregator @@ -70,17 +71,17 @@ func (self *CompositeAggregator) AggregatePoint(series string, group interface{} return self.right.AggregatePoint(series, group, p) } -func (self *CompositeAggregator) ColumnName() string { - return self.left.ColumnName() +func (self *CompositeAggregator) ColumnNames() []string { + return self.left.ColumnNames() } -func (self *CompositeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - values := self.right.GetValue(series, group) +func (self *CompositeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + values := self.right.GetValues(series, group) for _, v := range values { - point := &protocol.Point{Values: []*protocol.FieldValue{v}} + point := &protocol.Point{Values: v} self.left.AggregatePoint(series, group, point) } - return self.left.GetValue(series, group) + return self.left.GetValues(series, group) } func (self *CompositeAggregator) InitializeFieldsMetadata(series *protocol.Series) error { @@ -135,17 +136,21 @@ func (self *StandardDeviationAggregator) AggregatePoint(series string, group int return nil } -func (self *StandardDeviationAggregator) ColumnName() string { - return "stddev" +func (self *StandardDeviationAggregator) ColumnNames() []string { + return []string{"stddev"} } -func (self *StandardDeviationAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { +func (self *StandardDeviationAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { r := self.running[series][group] eX := r.totalX / float64(r.count) eX *= eX eX2 := r.totalX2 / float64(r.count) standardDeviation := math.Sqrt(eX2 - eX) - return []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &standardDeviation}} + return [][]*protocol.FieldValue{ + []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &standardDeviation}, + }, + } } func NewStandardDeviationAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) { @@ -212,11 +217,11 @@ func (self *DerivativeAggregator) AggregatePoint(series string, group interface{ return nil } -func (self *DerivativeAggregator) ColumnName() string { - return "derivative" +func (self *DerivativeAggregator) ColumnNames() []string { + return []string{"derivative"} } -func (self *DerivativeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { +func (self *DerivativeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { oldValue := self.firstValues[series][group] newValue := self.lastValues[series][group] @@ -224,7 +229,11 @@ func (self *DerivativeAggregator) GetValue(series string, group interface{}) []* deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond) deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue derivative := deltaV / deltaT - return []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &derivative}} + return [][]*protocol.FieldValue{ + []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &derivative}, + }, + } } func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) { @@ -245,6 +254,101 @@ func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, erro }, nil } +// +// Histogram Aggregator +// + +type HistogramAggregator struct { + AbstractAggregator + bucketSize float64 + histograms map[string]map[interface{}]map[int]int +} + +func (self *HistogramAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + groups := self.histograms[series] + if groups == nil { + groups = make(map[interface{}]map[int]int) + self.histograms[series] = groups + } + + buckets := groups[group] + if buckets == nil { + buckets = make(map[int]int) + groups[group] = buckets + } + + var value float64 + if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + value = float64(*ptr) + } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + value = *ptr + } + + bucket := int(value / self.bucketSize) + buckets[bucket] += 1 + + return nil +} + +func (self *HistogramAggregator) ColumnNames() []string { + return []string{"bucket_start", "count"} +} + +func (self *HistogramAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} + buckets := self.histograms[series][group] + for bucket, size := range buckets { + _bucket := float64(bucket) * self.bucketSize + _size := int64(size) + + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &_bucket}, + &protocol.FieldValue{Int64Value: &_size}, + }) + } + + return returnValues +} + +func NewHistogramAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) { + if len(v.Elems) < 1 { + return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() requires at least one arguments") + } + + if len(v.Elems) > 2 { + return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() takes at most two arguments") + } + + if v.Elems[0].Type == parser.ValueWildcard { + return nil, common.NewQueryError(common.InvalidArgument, "function histogram() doesn't work with wildcards") + } + + bucketSize := 1.0 + + if len(v.Elems) == 2 { + switch v.Elems[1].Type { + case parser.ValueInt, parser.ValueFloat: + var err error + bucketSize, err = strconv.ParseFloat(v.Elems[1].Name, 64) + if err != nil { + return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[1].Name) + } + default: + return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[1].Name) + } + } + + fieldName := v.Elems[0].Name + + return &HistogramAggregator{ + AbstractAggregator: AbstractAggregator{ + fieldName: fieldName, + }, + bucketSize: bucketSize, + histograms: make(map[string]map[interface{}]map[int]int), + }, nil +} + // // Count Aggregator // @@ -263,14 +367,16 @@ func (self *CountAggregator) AggregatePoint(series string, group interface{}, p return nil } -func (self *CountAggregator) ColumnName() string { - return "count" +func (self *CountAggregator) ColumnNames() []string { + return []string{"count"} } -func (self *CountAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *CountAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} value := int64(self.counts[series][group]) - returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value}) + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{Int64Value: &value}, + }) return returnValues } @@ -325,14 +431,16 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} return nil } -func (self *TimestampAggregator) ColumnName() string { - return "count" +func (self *TimestampAggregator) ColumnNames() []string { + return []string{"count"} } -func (self *TimestampAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *TimestampAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} value := self.timestamps[series][group] - returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value}) + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{Int64Value: &value}, + }) return returnValues } @@ -397,14 +505,16 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p * return nil } -func (self *MeanAggregator) ColumnName() string { - return "mean" +func (self *MeanAggregator) ColumnNames() []string { + return []string{"mean"} } -func (self *MeanAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *MeanAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} mean := self.means[series][group] - returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &mean}) + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &mean}, + }) return returnValues } @@ -471,18 +581,20 @@ func (self *PercentileAggregator) AggregatePoint(series string, group interface{ return nil } -func (self *PercentileAggregator) ColumnName() string { - return self.functionName +func (self *PercentileAggregator) ColumnNames() []string { + return []string{self.functionName} } -func (self *PercentileAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *PercentileAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} sort.Float64s(self.float_values[series][group]) length := len(self.float_values[series][group]) index := int(math.Floor(float64(length)*self.percentile/100.0+0.5)) - 1 point := self.float_values[series][group][index] - returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &point}) + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &point}, + }) return returnValues } @@ -546,12 +658,12 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p * return nil } -func (self *ModeAggregator) ColumnName() string { - return "mode" +func (self *ModeAggregator) ColumnNames() []string { + return []string{"mode"} } -func (self *ModeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *ModeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} values := []float64{} currentCount := 1 @@ -570,7 +682,9 @@ func (self *ModeAggregator) GetValue(series string, group interface{}) []*protoc // we can't use value since we need a pointer to a variable that won't change, // while value will change the next iteration v := value - returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v}) + returnValues = append(returnValues, []*protocol.FieldValue{ + &protocol.FieldValue{DoubleValue: &v}, + }) } return returnValues @@ -632,24 +746,24 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, return nil } -func (self *DistinctAggregator) ColumnName() string { - return "distinct" +func (self *DistinctAggregator) ColumnNames() []string { + return []string{"distinct"} } -func (self *DistinctAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *DistinctAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} for value, _ := range self.counts[series][group] { switch v := value.(type) { case int: i := int64(v) - returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &i}) + returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{Int64Value: &i}}) case string: - returnValues = append(returnValues, &protocol.FieldValue{StringValue: &v}) + returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{StringValue: &v}}) case bool: - returnValues = append(returnValues, &protocol.FieldValue{BoolValue: &v}) + returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{BoolValue: &v}}) case float64: - returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v}) + returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &v}}) } } @@ -693,14 +807,14 @@ func (self *CumulativeArithmeticAggregator) AggregatePoint(series string, group return nil } -func (self *CumulativeArithmeticAggregator) ColumnName() string { - return self.name +func (self *CumulativeArithmeticAggregator) ColumnNames() []string { + return []string{self.name} } -func (self *CumulativeArithmeticAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - returnValues := []*protocol.FieldValue{} +func (self *CumulativeArithmeticAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + returnValues := [][]*protocol.FieldValue{} value := self.values[series][group] - returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &value}) + returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &value}}) return returnValues } @@ -781,12 +895,16 @@ func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface return nil } -func (self *FirstOrLastAggregator) ColumnName() string { - return self.name +func (self *FirstOrLastAggregator) ColumnNames() []string { + return []string{self.name} } -func (self *FirstOrLastAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { - return []*protocol.FieldValue{self.values[series][group]} +func (self *FirstOrLastAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue { + return [][]*protocol.FieldValue{ + []*protocol.FieldValue{ + self.values[series][group], + }, + } } func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool) (Aggregator, error) { diff --git a/src/engine/engine.go b/src/engine/engine.go index 94d5a4f001..4470b825af 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -192,7 +192,7 @@ func createValuesToInterface(groupBy parser.GroupByClause, fields []string) (Map } } -func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue { +func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue { if len(values) == 0 { return [][]*protocol.FieldValue{[]*protocol.FieldValue{}} } @@ -201,7 +201,7 @@ func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue { returnValues := [][]*protocol.FieldValue{} for _, v := range values[len(values)-1] { for _, values := range _returnedValues { - returnValues = append(returnValues, append(values, v)) + returnValues = append(returnValues, append(values, v...)) } } return returnValues @@ -219,8 +219,8 @@ func (self SortableGroups) Len() int { } func (self SortableGroups) Less(i, j int) bool { - iTimestamp := self.aggregator.GetValue(self.table, self.data[i])[0].Int64Value - jTimestamp := self.aggregator.GetValue(self.table, self.data[j])[0].Int64Value + iTimestamp := self.aggregator.GetValues(self.table, self.data[i])[0][0].Int64Value + jTimestamp := self.aggregator.GetValues(self.table, self.data[j])[0][0].Int64Value if self.ascending { return *iTimestamp < *jTimestamp } @@ -301,8 +301,8 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database fields := []string{} for _, aggregator := range aggregators { - columnName := aggregator.ColumnName() - fields = append(fields, columnName) + columnNames := aggregator.ColumnNames() + fields = append(fields, columnNames...) } for _, value := range groupBy { @@ -328,17 +328,17 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database sort.Sort(sortedGroups) for _, groupId := range sortedGroups.data { - timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value - values := [][]*protocol.FieldValue{} + timestamp := *timestampAggregator.GetValues(table, groupId)[0][0].Int64Value + values := [][][]*protocol.FieldValue{} for _, aggregator := range aggregators { - values = append(values, aggregator.GetValue(table, groupId)) + values = append(values, aggregator.GetValues(table, groupId)) } // do cross product of all the values - values = crossProduct(values) + _values := crossProduct(values) - for _, v := range values { + for _, v := range _values { /* groupPoints := []*protocol.Point{} */ point := &protocol.Point{ Values: v, diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index e4b79c4184..3bfb77afab 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -1485,6 +1485,203 @@ func (self *EngineSuite) TestQueryWithMergedTablesWithPointsAppend(c *C) { ]`) } +func (self *EngineSuite) TestHistogramQueryWithGroupByTime(c *C) { + // make the mock coordinator return some data + engine := createEngine(c, ` +[ + { + "points": [ + { + "values": [ + { + "int64_value": 100 + } + ], + "timestamp": 1381346641000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 5 + } + ], + "timestamp": 1381346651000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 200 + } + ], + "timestamp": 1381346701000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 299 + } + ], + "timestamp": 1381346721000000, + "sequence_number": 1 + } + ], + "name": "foo", + "fields": ["column_one"] + } +] +`) + + runQuery(engine, "select histogram(column_one, 100) from foo group by time(1m) order asc", c, `[ + { + "points": [ + { + "values": [ + { + "double_value": 100 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346640000000 + }, + { + "values": [ + { + "double_value": 0 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346640000000 + }, + { + "values": [ + { + "double_value": 200 + }, + { + "int64_value": 2 + } + ], + "timestamp": 1381346700000000 + } + ], + "name": "foo", + "fields": ["bucket_start", "count"] + } +] +`) +} + +func (self *EngineSuite) TestHistogramQueryWithGroupByTimeAndDefaultBucketSize(c *C) { + // make the mock coordinator return some data + engine := createEngine(c, ` +[ + { + "points": [ + { + "values": [ + { + "int64_value": 100 + } + ], + "timestamp": 1381346641000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 5 + } + ], + "timestamp": 1381346651000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 200 + } + ], + "timestamp": 1381346701000000, + "sequence_number": 1 + }, + { + "values": [ + { + "int64_value": 299 + } + ], + "timestamp": 1381346721000000, + "sequence_number": 1 + } + ], + "name": "foo", + "fields": ["column_one"] + } +] +`) + + runQuery(engine, "select histogram(column_one) from foo group by time(1m) order asc", c, `[ + { + "points": [ + { + "values": [ + { + "double_value": 100 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346640000000 + }, + { + "values": [ + { + "double_value": 5 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346640000000 + }, + { + "values": [ + { + "double_value": 200 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346700000000 + }, + { + "values": [ + { + "double_value": 299 + }, + { + "int64_value": 1 + } + ], + "timestamp": 1381346700000000 + } + ], + "name": "foo", + "fields": ["bucket_start", "count"] + } +] +`) +} + func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) { err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument") engine := createEngine(c, `[]`)