Add support for group by clauses.
parent
4a299ce5d7
commit
0c4ea6ce92
|
@ -12,7 +12,11 @@ type QueryEngine struct {
|
|||
|
||||
func (self *QueryEngine) RunQuery(query *parser.Query, yield func(*protocol.Series) error) error {
|
||||
if isCountQuery(query) {
|
||||
return self.executeCountQuery(query, yield)
|
||||
if groupBy := query.GetGroupByClause(); len(groupBy) > 0 {
|
||||
return self.executeCountQueryWithGroupBy(query, yield)
|
||||
} else {
|
||||
return self.executeCountQuery(query, yield)
|
||||
}
|
||||
} else {
|
||||
self.coordinator.DistributeQuery(query, yield)
|
||||
}
|
||||
|
@ -76,3 +80,83 @@ func (self *QueryEngine) executeCountQuery(query *parser.Query, yield func(*prot
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield func(*protocol.Series) error) error {
|
||||
counts := make(map[interface{}]int32)
|
||||
var timestamp int64 = 0
|
||||
groupBy := query.GetGroupByClause()[0]
|
||||
var columnType protocol.FieldDefinition_Type
|
||||
|
||||
self.coordinator.DistributeQuery(query, func(series *protocol.Series) error {
|
||||
var columnIndex int
|
||||
|
||||
for index, value := range series.Fields {
|
||||
if *value.Name == groupBy.Name {
|
||||
columnIndex = index
|
||||
columnType = *value.Type
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, point := range series.Points {
|
||||
var value interface{}
|
||||
|
||||
switch columnType {
|
||||
case protocol.FieldDefinition_STRING:
|
||||
value = *point.Values[columnIndex].StringValue
|
||||
case protocol.FieldDefinition_INT32:
|
||||
value = *point.Values[columnIndex].IntValue
|
||||
case protocol.FieldDefinition_DOUBLE:
|
||||
value = *point.Values[columnIndex].DoubleValue
|
||||
case protocol.FieldDefinition_BOOL:
|
||||
value = *point.Values[columnIndex].BoolValue
|
||||
}
|
||||
|
||||
c := counts[value]
|
||||
counts[value] = c + 1
|
||||
}
|
||||
|
||||
if len(series.Points) > 0 {
|
||||
timestamp = series.Points[0].GetTimestamp()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
expectedFieldType := protocol.FieldDefinition_INT32
|
||||
expectedName := "count"
|
||||
var sequenceNumber uint32 = 1
|
||||
|
||||
/* fields := []*protocol.FieldDefinition{} */
|
||||
points := []*protocol.Point{}
|
||||
|
||||
fields := []*protocol.FieldDefinition{
|
||||
&protocol.FieldDefinition{Name: &expectedName, Type: &expectedFieldType},
|
||||
&protocol.FieldDefinition{Name: &groupBy.Name, Type: &columnType},
|
||||
}
|
||||
|
||||
for key, count := range counts {
|
||||
tempKey := key.(string)
|
||||
tempCount := count
|
||||
|
||||
points = append(points, &protocol.Point{
|
||||
Timestamp: ×tamp,
|
||||
SequenceNumber: &sequenceNumber,
|
||||
Values: []*protocol.FieldValue{
|
||||
&protocol.FieldValue{
|
||||
IntValue: &tempCount,
|
||||
},
|
||||
&protocol.FieldValue{
|
||||
StringValue: &tempKey,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
expectedData := &protocol.Series{
|
||||
Name: &query.GetFromClause().Name,
|
||||
Fields: fields,
|
||||
Points: points,
|
||||
}
|
||||
yield(expectedData)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -315,7 +315,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
|
|||
]
|
||||
`)
|
||||
|
||||
runQuery(engine, "select count(*), column_one from foo.* group by column_one;", c, `[
|
||||
runQuery(engine, "select count(*), column_one from foo group by column_one;", c, `[
|
||||
{
|
||||
"points": [
|
||||
{
|
||||
|
@ -336,7 +336,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
|
|||
"int_value": 1
|
||||
},
|
||||
{
|
||||
"string_value": "some_value"
|
||||
"string_value": "another_value"
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346631,
|
||||
|
|
Loading…
Reference in New Issue