diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 5f8be2135b..fce8a87844 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -26,6 +26,7 @@ var registeredAggregators = make(map[string]AggregatorInitializer) func init() { registeredAggregators["count"] = NewCountAggregator + registeredAggregators["derivative"] = NewDerivativeAggregator registeredAggregators["max"] = NewMaxAggregator registeredAggregators["min"] = NewMinAggregator registeredAggregators["sum"] = NewSumAggregator @@ -70,6 +71,94 @@ func NewCompositeAggregator(left, right Aggregator) (Aggregator, error) { return &CompositeAggregator{left, right}, nil } +// +// Derivative Aggregator +// + +type DerivativeAggregator struct { + fieldIndex int + fieldName string + lastValues map[string]map[interface{}]*protocol.Point + points map[string]map[interface{}][]*protocol.FieldValue +} + +func (self *DerivativeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + lastValues := self.lastValues[series] + if lastValues == nil { + lastValues = make(map[interface{}]*protocol.Point) + self.lastValues[series] = lastValues + } + + var value float64 + if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + value = float64(*ptr) + } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + value = *ptr + } else { + // else ignore this point + return nil + } + + newValue := &protocol.Point{ + Timestamp: p.Timestamp, + SequenceNumber: p.SequenceNumber, + Values: []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &value}}, + } + + var oldValue *protocol.Point + oldValue, lastValues[group] = lastValues[group], newValue + if oldValue == nil { + return nil + } + + // if an old value exist, then compute the derivative and insert it in the points slice + deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond) + deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue + derivative := deltaV / deltaT + points := self.points[series] + if points == nil { + points = make(map[interface{}][]*protocol.FieldValue) + self.points[series] = points + } + points[group] = append(points[group], &protocol.FieldValue{DoubleValue: &derivative}) + return nil +} + +func (self *DerivativeAggregator) ColumnName() string { + return "derivative" +} + +func (self *DerivativeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue { + return self.points[series][group] +} + +func (self *DerivativeAggregator) InitializeFieldsMetadata(series *protocol.Series) error { + for idx, field := range series.Fields { + if field == self.fieldName { + self.fieldIndex = idx + return nil + } + } + + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName)) +} + +func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) { + if len(v.Elems) != 1 { + return nil, common.NewQueryError(common.WrongNumberOfArguments, "function derivative() requires exactly one argument") + } + + if v.Elems[0].Type == parser.ValueWildcard { + return nil, common.NewQueryError(common.InvalidArgument, "function derivative() doesn't work with wildcards") + } + + return &DerivativeAggregator{ + fieldName: v.Elems[0].Name, + lastValues: make(map[string]map[interface{}]*protocol.Point), + points: make(map[string]map[interface{}][]*protocol.FieldValue), + }, nil +} + // // Count Aggregator // diff --git a/src/engine/engine.go b/src/engine/engine.go index 14f21e6df2..618dbcd75f 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -190,6 +190,21 @@ func createValuesToInterface(groupBy parser.GroupByClause, fields []string) (Map } } +func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue { + if len(values) == 0 { + return [][]*protocol.FieldValue{[]*protocol.FieldValue{}} + } + + _returnedValues := crossProduct(values[:len(values)-1]) + returnValues := [][]*protocol.FieldValue{} + for _, v := range values[len(values)-1] { + for _, values := range _returnedValues { + returnValues = append(returnValues, append(values, v)) + } + } + return returnValues +} + func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) error { duration, err := query.GetGroupByClause().GetGroupByTime() @@ -279,48 +294,46 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database points := []*protocol.Point{} for groupId, _ := range tableGroups { timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value - /* groupPoints := []*protocol.Point{} */ - point := &protocol.Point{ - SequenceNumber: &sequenceNumber, - Values: []*protocol.FieldValue{}, - } - point.SetTimestampInMicroseconds(timestamp) + values := [][]*protocol.FieldValue{} for _, aggregator := range aggregators { - // point.Values = append(point.Values, aggregator.GetValue(table, groupId)[0]) - returnValues := aggregator.GetValue(table, groupId) - returnDepth := len(returnValues) - for _, value := range returnValues { - if returnDepth > 1 { - // do some crazy shit - } else { - point.Values = append(point.Values, value) + values = append(values, aggregator.GetValue(table, groupId)) + } + + // do cross product of all the values + values = crossProduct(values) + + for _, v := range values { + /* groupPoints := []*protocol.Point{} */ + point := &protocol.Point{ + SequenceNumber: &sequenceNumber, + Values: v, + } + point.SetTimestampInMicroseconds(timestamp) + + // FIXME: this should be looking at the fields slice not the group by clause + // FIXME: we should check whether the selected columns are in the group by clause + for idx, _ := range groupBy { + if duration != nil && idx == 0 { + continue + } + + value := inverse(groupId, idx) + + switch x := value.(type) { + case string: + point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x}) + case bool: + point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x}) + case float64: + point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x}) + case int64: + point.Values = append(point.Values, &protocol.FieldValue{Int64Value: &x}) } } + + points = append(points, point) } - - // FIXME: this should be looking at the fields slice not the group by clause - // FIXME: we should check whether the selected columns are in the group by clause - for idx, _ := range groupBy { - if duration != nil && idx == 0 { - continue - } - - value := inverse(groupId, idx) - - switch x := value.(type) { - case string: - point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x}) - case bool: - point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x}) - case float64: - point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x}) - case int64: - point.Values = append(point.Values, &protocol.FieldValue{Int64Value: &x}) - } - } - - points = append(points, point) } expectedData := &protocol.Series{ Name: &tempTable,