From 8686760860346ab3fd68dcdaa845b5b47a4a6f07 Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Fri, 11 Oct 2013 14:19:17 -0400 Subject: [PATCH] Implements min queries. --- src/engine/engine.go | 130 ++++++++++++++++++++++++++++++++++++++ src/engine/engine_test.go | 80 +++++++++++++++++++++++ 2 files changed, 210 insertions(+) diff --git a/src/engine/engine.go b/src/engine/engine.go index c1dbfeddca..f91526d613 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -33,6 +33,12 @@ func (self *QueryEngine) RunQuery(query *parser.Query, yield func(*protocol.Seri } else { return self.executeCountQuery(query, yield) } + } else if isMinQuery(query) { + if groupBy := query.GetGroupByClause(); len(groupBy) > 0 { + return self.executeMinQueryWithGroupBy(query, yield) + } else { + // return self.executeMinQuery(query, yield) + } } else { self.coordinator.DistributeQuery(query, yield) } @@ -53,6 +59,16 @@ func isCountQuery(query *parser.Query) bool { return false } +func isMinQuery(query *parser.Query) bool { + for _, column := range query.GetColumnNames() { + if column.IsFunctionCall() && column.Name == "min" { + return true + } + } + + return false +} + func (self *QueryEngine) executeCountQuery(query *parser.Query, yield func(*protocol.Series) error) error { count := make(map[string]int32) var timestamp int64 = 0 @@ -332,3 +348,117 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield yield(expectedData) return nil } + +func (self *QueryEngine) executeMinQueryWithGroupBy(query *parser.Query, yield func(*protocol.Series) error) error { + mins := make(map[interface{}]int32) + timestamps := make(map[interface{}]int64) + var minField string + var inverse InverseMapper + + groupBy := query.GetGroupByClause() + + for _, column := range query.GetColumnNames() { + if column.IsFunctionCall() && column.Name == "min" { + minField = column.Elems[0].Name + } + } + + fieldTypes := map[string]*protocol.FieldDefinition_Type{} + duration, ok := groupBy.GetGroupByTime() + + self.coordinator.DistributeQuery(query, func(series *protocol.Series) error { + var mapper Mapper + mapper, inverse = createValuesToInterface(groupBy, series.Fields) + var fieldIndex int + + for idx, field := range series.Fields { + fieldTypes[*field.Name] = field.Type + if *field.Name == minField { + fieldIndex = idx + } + } + + for _, point := range series.Points { + min := *point.Values[fieldIndex].IntValue + value := mapper(point) + + if oldMin, exists := mins[value]; exists { + if min < oldMin { + mins[value] = min + } + } else { + mins[value] = min + } + + if ok { + timestamps[value] = getTimestampFromPoint(duration, point) + } else { + timestamps[value] = point.GetTimestamp() + } + } + + return nil + }) + + expectedFieldType := protocol.FieldDefinition_INT32 + expectedName := "min" + var sequenceNumber uint32 = 1 + + /* fields := []*protocol.FieldDefinition{} */ + points := []*protocol.Point{} + + fields := []*protocol.FieldDefinition{ + &protocol.FieldDefinition{Name: &expectedName, Type: &expectedFieldType}, + } + + for _, value := range groupBy { + if value.IsFunctionCall() { + continue + } + + tempName := value.Name + fields = append(fields, &protocol.FieldDefinition{Name: &tempName, Type: fieldTypes[tempName]}) + } + + for key, count := range mins { + tempKey := key + tempCount := count + + timestamp := timestamps[tempKey] + + point := &protocol.Point{ + Timestamp: ×tamp, + SequenceNumber: &sequenceNumber, + Values: []*protocol.FieldValue{ + &protocol.FieldValue{ + IntValue: &tempCount, + }, + }, + } + + for idx, _ := range groupBy { + value := inverse(tempKey, idx) + + switch x := value.(type) { + case string: + point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x}) + case int32: + point.Values = append(point.Values, &protocol.FieldValue{IntValue: &x}) + case bool: + point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x}) + case float64: + point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x}) + } + } + + points = append(points, point) + } + + expectedData := &protocol.Series{ + Name: &query.GetFromClause().Name, + Fields: fields, + Points: points, + } + yield(expectedData) + return nil +} diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index 74ff173d3e..3fa8f5fa0d 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -703,6 +703,86 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) { } +func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) { + // make the mock coordinator return some data + engine := createEngine(c, ` +[ + { + "points": [ + { + "values": [ + { + "int_value": 3 + } + ], + "timestamp": 1381346641, + "sequence_number": 1 + }, + { + "values": [ + { + "int_value": 8 + } + ], + "timestamp": 1381346701, + "sequence_number": 1 + }, + { + "values": [ + { + "int_value": 4 + } + ], + "timestamp": 1381346721, + "sequence_number": 1 + } + ], + "name": "foo", + "fields": [ + { + "type": "STRING", + "name": "column_one" + } + ] + } +] +`) + + runQuery(engine, "select min(column_one) from foo group by time(1m);", c, `[ + { + "points": [ + { + "values": [ + { + "int_value": 3 + } + ], + "timestamp": 1381346640, + "sequence_number": 1 + }, + { + "values": [ + { + "int_value": 4 + } + ], + "timestamp": 1381346700, + "sequence_number": 1 + } + ], + "name": "foo", + "fields": [ + { + "type": "INT32", + "name": "min" + } + ] + } +] +`) + +} + func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) { err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument") engine := createEngine(c, `[]`)