fix #59. Add histogram aggregate function

pull/82/head
John Shahid 2013-11-19 16:13:23 -05:00
parent bfc11d7bdb
commit c3807b7f85
4 changed files with 385 additions and 69 deletions

View File

@ -119,6 +119,7 @@
- [Issue #35](https://github.com/influxdb/influxdb/issues/35). Support table aliases in Join Queries
- [Issue #71](https://github.com/influxdb/influxdb/issues/71). Add WillReturnSingleSeries to the Query
- [Issue #61](https://github.com/influxdb/influxdb/issues/61). Limit should default to 10k
- [Issue #59](https://github.com/influxdb/influxdb/issues/59). Add histogram aggregate function
## Bugfixes

View File

@ -16,8 +16,8 @@ type PointSlice []protocol.Point
type Aggregator interface {
AggregatePoint(series string, group interface{}, p *protocol.Point) error
InitializeFieldsMetadata(series *protocol.Series) error
GetValue(series string, group interface{}) []*protocol.FieldValue
ColumnName() string
GetValues(series string, group interface{}) [][]*protocol.FieldValue
ColumnNames() []string
}
type AggregatorInitializer func(*parser.Query, *parser.Value) (Aggregator, error)
@ -26,6 +26,7 @@ var registeredAggregators = make(map[string]AggregatorInitializer)
func init() {
registeredAggregators["count"] = NewCountAggregator
registeredAggregators["histogram"] = NewHistogramAggregator
registeredAggregators["derivative"] = NewDerivativeAggregator
registeredAggregators["stddev"] = NewStandardDeviationAggregator
registeredAggregators["max"] = NewMaxAggregator
@ -70,17 +71,17 @@ func (self *CompositeAggregator) AggregatePoint(series string, group interface{}
return self.right.AggregatePoint(series, group, p)
}
func (self *CompositeAggregator) ColumnName() string {
return self.left.ColumnName()
func (self *CompositeAggregator) ColumnNames() []string {
return self.left.ColumnNames()
}
func (self *CompositeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
values := self.right.GetValue(series, group)
func (self *CompositeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
values := self.right.GetValues(series, group)
for _, v := range values {
point := &protocol.Point{Values: []*protocol.FieldValue{v}}
point := &protocol.Point{Values: v}
self.left.AggregatePoint(series, group, point)
}
return self.left.GetValue(series, group)
return self.left.GetValues(series, group)
}
func (self *CompositeAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
@ -135,17 +136,21 @@ func (self *StandardDeviationAggregator) AggregatePoint(series string, group int
return nil
}
func (self *StandardDeviationAggregator) ColumnName() string {
return "stddev"
func (self *StandardDeviationAggregator) ColumnNames() []string {
return []string{"stddev"}
}
func (self *StandardDeviationAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
func (self *StandardDeviationAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
r := self.running[series][group]
eX := r.totalX / float64(r.count)
eX *= eX
eX2 := r.totalX2 / float64(r.count)
standardDeviation := math.Sqrt(eX2 - eX)
return []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &standardDeviation}}
return [][]*protocol.FieldValue{
[]*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &standardDeviation},
},
}
}
func NewStandardDeviationAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) {
@ -212,11 +217,11 @@ func (self *DerivativeAggregator) AggregatePoint(series string, group interface{
return nil
}
func (self *DerivativeAggregator) ColumnName() string {
return "derivative"
func (self *DerivativeAggregator) ColumnNames() []string {
return []string{"derivative"}
}
func (self *DerivativeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
func (self *DerivativeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
oldValue := self.firstValues[series][group]
newValue := self.lastValues[series][group]
@ -224,7 +229,11 @@ func (self *DerivativeAggregator) GetValue(series string, group interface{}) []*
deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond)
deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue
derivative := deltaV / deltaT
return []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &derivative}}
return [][]*protocol.FieldValue{
[]*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &derivative},
},
}
}
func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) {
@ -245,6 +254,101 @@ func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, erro
}, nil
}
//
// Histogram Aggregator
//
type HistogramAggregator struct {
AbstractAggregator
bucketSize float64
histograms map[string]map[interface{}]map[int]int
}
func (self *HistogramAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
groups := self.histograms[series]
if groups == nil {
groups = make(map[interface{}]map[int]int)
self.histograms[series] = groups
}
buckets := groups[group]
if buckets == nil {
buckets = make(map[int]int)
groups[group] = buckets
}
var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
value = *ptr
}
bucket := int(value / self.bucketSize)
buckets[bucket] += 1
return nil
}
func (self *HistogramAggregator) ColumnNames() []string {
return []string{"bucket_start", "count"}
}
func (self *HistogramAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
buckets := self.histograms[series][group]
for bucket, size := range buckets {
_bucket := float64(bucket) * self.bucketSize
_size := int64(size)
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &_bucket},
&protocol.FieldValue{Int64Value: &_size},
})
}
return returnValues
}
func NewHistogramAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) {
if len(v.Elems) < 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() requires at least one arguments")
}
if len(v.Elems) > 2 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function histogram() takes at most two arguments")
}
if v.Elems[0].Type == parser.ValueWildcard {
return nil, common.NewQueryError(common.InvalidArgument, "function histogram() doesn't work with wildcards")
}
bucketSize := 1.0
if len(v.Elems) == 2 {
switch v.Elems[1].Type {
case parser.ValueInt, parser.ValueFloat:
var err error
bucketSize, err = strconv.ParseFloat(v.Elems[1].Name, 64)
if err != nil {
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[1].Name)
}
default:
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a float", v.Elems[1].Name)
}
}
fieldName := v.Elems[0].Name
return &HistogramAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: fieldName,
},
bucketSize: bucketSize,
histograms: make(map[string]map[interface{}]map[int]int),
}, nil
}
//
// Count Aggregator
//
@ -263,14 +367,16 @@ func (self *CountAggregator) AggregatePoint(series string, group interface{}, p
return nil
}
func (self *CountAggregator) ColumnName() string {
return "count"
func (self *CountAggregator) ColumnNames() []string {
return []string{"count"}
}
func (self *CountAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *CountAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
value := int64(self.counts[series][group])
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value})
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{Int64Value: &value},
})
return returnValues
}
@ -325,14 +431,16 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{}
return nil
}
func (self *TimestampAggregator) ColumnName() string {
return "count"
func (self *TimestampAggregator) ColumnNames() []string {
return []string{"count"}
}
func (self *TimestampAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *TimestampAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
value := self.timestamps[series][group]
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &value})
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{Int64Value: &value},
})
return returnValues
}
@ -397,14 +505,16 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p *
return nil
}
func (self *MeanAggregator) ColumnName() string {
return "mean"
func (self *MeanAggregator) ColumnNames() []string {
return []string{"mean"}
}
func (self *MeanAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *MeanAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
mean := self.means[series][group]
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &mean})
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &mean},
})
return returnValues
}
@ -471,18 +581,20 @@ func (self *PercentileAggregator) AggregatePoint(series string, group interface{
return nil
}
func (self *PercentileAggregator) ColumnName() string {
return self.functionName
func (self *PercentileAggregator) ColumnNames() []string {
return []string{self.functionName}
}
func (self *PercentileAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *PercentileAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
sort.Float64s(self.float_values[series][group])
length := len(self.float_values[series][group])
index := int(math.Floor(float64(length)*self.percentile/100.0+0.5)) - 1
point := self.float_values[series][group][index]
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &point})
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &point},
})
return returnValues
}
@ -546,12 +658,12 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *
return nil
}
func (self *ModeAggregator) ColumnName() string {
return "mode"
func (self *ModeAggregator) ColumnNames() []string {
return []string{"mode"}
}
func (self *ModeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *ModeAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
values := []float64{}
currentCount := 1
@ -570,7 +682,9 @@ func (self *ModeAggregator) GetValue(series string, group interface{}) []*protoc
// we can't use value since we need a pointer to a variable that won't change,
// while value will change the next iteration
v := value
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v})
returnValues = append(returnValues, []*protocol.FieldValue{
&protocol.FieldValue{DoubleValue: &v},
})
}
return returnValues
@ -632,24 +746,24 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{},
return nil
}
func (self *DistinctAggregator) ColumnName() string {
return "distinct"
func (self *DistinctAggregator) ColumnNames() []string {
return []string{"distinct"}
}
func (self *DistinctAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *DistinctAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
for value, _ := range self.counts[series][group] {
switch v := value.(type) {
case int:
i := int64(v)
returnValues = append(returnValues, &protocol.FieldValue{Int64Value: &i})
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{Int64Value: &i}})
case string:
returnValues = append(returnValues, &protocol.FieldValue{StringValue: &v})
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{StringValue: &v}})
case bool:
returnValues = append(returnValues, &protocol.FieldValue{BoolValue: &v})
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{BoolValue: &v}})
case float64:
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &v})
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &v}})
}
}
@ -693,14 +807,14 @@ func (self *CumulativeArithmeticAggregator) AggregatePoint(series string, group
return nil
}
func (self *CumulativeArithmeticAggregator) ColumnName() string {
return self.name
func (self *CumulativeArithmeticAggregator) ColumnNames() []string {
return []string{self.name}
}
func (self *CumulativeArithmeticAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
returnValues := []*protocol.FieldValue{}
func (self *CumulativeArithmeticAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
value := self.values[series][group]
returnValues = append(returnValues, &protocol.FieldValue{DoubleValue: &value})
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &value}})
return returnValues
}
@ -781,12 +895,16 @@ func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface
return nil
}
func (self *FirstOrLastAggregator) ColumnName() string {
return self.name
func (self *FirstOrLastAggregator) ColumnNames() []string {
return []string{self.name}
}
func (self *FirstOrLastAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
return []*protocol.FieldValue{self.values[series][group]}
func (self *FirstOrLastAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
return [][]*protocol.FieldValue{
[]*protocol.FieldValue{
self.values[series][group],
},
}
}
func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool) (Aggregator, error) {

View File

@ -192,7 +192,7 @@ func createValuesToInterface(groupBy parser.GroupByClause, fields []string) (Map
}
}
func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue {
func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue {
if len(values) == 0 {
return [][]*protocol.FieldValue{[]*protocol.FieldValue{}}
}
@ -201,7 +201,7 @@ func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
for _, v := range values[len(values)-1] {
for _, values := range _returnedValues {
returnValues = append(returnValues, append(values, v))
returnValues = append(returnValues, append(values, v...))
}
}
return returnValues
@ -219,8 +219,8 @@ func (self SortableGroups) Len() int {
}
func (self SortableGroups) Less(i, j int) bool {
iTimestamp := self.aggregator.GetValue(self.table, self.data[i])[0].Int64Value
jTimestamp := self.aggregator.GetValue(self.table, self.data[j])[0].Int64Value
iTimestamp := self.aggregator.GetValues(self.table, self.data[i])[0][0].Int64Value
jTimestamp := self.aggregator.GetValues(self.table, self.data[j])[0][0].Int64Value
if self.ascending {
return *iTimestamp < *jTimestamp
}
@ -301,8 +301,8 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database
fields := []string{}
for _, aggregator := range aggregators {
columnName := aggregator.ColumnName()
fields = append(fields, columnName)
columnNames := aggregator.ColumnNames()
fields = append(fields, columnNames...)
}
for _, value := range groupBy {
@ -328,17 +328,17 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database
sort.Sort(sortedGroups)
for _, groupId := range sortedGroups.data {
timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value
values := [][]*protocol.FieldValue{}
timestamp := *timestampAggregator.GetValues(table, groupId)[0][0].Int64Value
values := [][][]*protocol.FieldValue{}
for _, aggregator := range aggregators {
values = append(values, aggregator.GetValue(table, groupId))
values = append(values, aggregator.GetValues(table, groupId))
}
// do cross product of all the values
values = crossProduct(values)
_values := crossProduct(values)
for _, v := range values {
for _, v := range _values {
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
Values: v,

View File

@ -1485,6 +1485,203 @@ func (self *EngineSuite) TestQueryWithMergedTablesWithPointsAppend(c *C) {
]`)
}
func (self *EngineSuite) TestHistogramQueryWithGroupByTime(c *C) {
// make the mock coordinator return some data
engine := createEngine(c, `
[
{
"points": [
{
"values": [
{
"int64_value": 100
}
],
"timestamp": 1381346641000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 5
}
],
"timestamp": 1381346651000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 200
}
],
"timestamp": 1381346701000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 299
}
],
"timestamp": 1381346721000000,
"sequence_number": 1
}
],
"name": "foo",
"fields": ["column_one"]
}
]
`)
runQuery(engine, "select histogram(column_one, 100) from foo group by time(1m) order asc", c, `[
{
"points": [
{
"values": [
{
"double_value": 100
},
{
"int64_value": 1
}
],
"timestamp": 1381346640000000
},
{
"values": [
{
"double_value": 0
},
{
"int64_value": 1
}
],
"timestamp": 1381346640000000
},
{
"values": [
{
"double_value": 200
},
{
"int64_value": 2
}
],
"timestamp": 1381346700000000
}
],
"name": "foo",
"fields": ["bucket_start", "count"]
}
]
`)
}
func (self *EngineSuite) TestHistogramQueryWithGroupByTimeAndDefaultBucketSize(c *C) {
// make the mock coordinator return some data
engine := createEngine(c, `
[
{
"points": [
{
"values": [
{
"int64_value": 100
}
],
"timestamp": 1381346641000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 5
}
],
"timestamp": 1381346651000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 200
}
],
"timestamp": 1381346701000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 299
}
],
"timestamp": 1381346721000000,
"sequence_number": 1
}
],
"name": "foo",
"fields": ["column_one"]
}
]
`)
runQuery(engine, "select histogram(column_one) from foo group by time(1m) order asc", c, `[
{
"points": [
{
"values": [
{
"double_value": 100
},
{
"int64_value": 1
}
],
"timestamp": 1381346640000000
},
{
"values": [
{
"double_value": 5
},
{
"int64_value": 1
}
],
"timestamp": 1381346640000000
},
{
"values": [
{
"double_value": 200
},
{
"int64_value": 1
}
],
"timestamp": 1381346700000000
},
{
"values": [
{
"double_value": 299
},
{
"int64_value": 1
}
],
"timestamp": 1381346700000000
}
],
"name": "foo",
"fields": ["bucket_start", "count"]
}
]
`)
}
func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) {
err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
engine := createEngine(c, `[]`)