Implements min queries.
parent
9416b22ff5
commit
8686760860
|
@ -33,6 +33,12 @@ func (self *QueryEngine) RunQuery(query *parser.Query, yield func(*protocol.Seri
|
|||
} else {
|
||||
return self.executeCountQuery(query, yield)
|
||||
}
|
||||
} else if isMinQuery(query) {
|
||||
if groupBy := query.GetGroupByClause(); len(groupBy) > 0 {
|
||||
return self.executeMinQueryWithGroupBy(query, yield)
|
||||
} else {
|
||||
// return self.executeMinQuery(query, yield)
|
||||
}
|
||||
} else {
|
||||
self.coordinator.DistributeQuery(query, yield)
|
||||
}
|
||||
|
@ -53,6 +59,16 @@ func isCountQuery(query *parser.Query) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func isMinQuery(query *parser.Query) bool {
|
||||
for _, column := range query.GetColumnNames() {
|
||||
if column.IsFunctionCall() && column.Name == "min" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (self *QueryEngine) executeCountQuery(query *parser.Query, yield func(*protocol.Series) error) error {
|
||||
count := make(map[string]int32)
|
||||
var timestamp int64 = 0
|
||||
|
@ -332,3 +348,117 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield
|
|||
yield(expectedData)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *QueryEngine) executeMinQueryWithGroupBy(query *parser.Query, yield func(*protocol.Series) error) error {
|
||||
mins := make(map[interface{}]int32)
|
||||
timestamps := make(map[interface{}]int64)
|
||||
var minField string
|
||||
var inverse InverseMapper
|
||||
|
||||
groupBy := query.GetGroupByClause()
|
||||
|
||||
for _, column := range query.GetColumnNames() {
|
||||
if column.IsFunctionCall() && column.Name == "min" {
|
||||
minField = column.Elems[0].Name
|
||||
}
|
||||
}
|
||||
|
||||
fieldTypes := map[string]*protocol.FieldDefinition_Type{}
|
||||
duration, ok := groupBy.GetGroupByTime()
|
||||
|
||||
self.coordinator.DistributeQuery(query, func(series *protocol.Series) error {
|
||||
var mapper Mapper
|
||||
mapper, inverse = createValuesToInterface(groupBy, series.Fields)
|
||||
var fieldIndex int
|
||||
|
||||
for idx, field := range series.Fields {
|
||||
fieldTypes[*field.Name] = field.Type
|
||||
if *field.Name == minField {
|
||||
fieldIndex = idx
|
||||
}
|
||||
}
|
||||
|
||||
for _, point := range series.Points {
|
||||
min := *point.Values[fieldIndex].IntValue
|
||||
value := mapper(point)
|
||||
|
||||
if oldMin, exists := mins[value]; exists {
|
||||
if min < oldMin {
|
||||
mins[value] = min
|
||||
}
|
||||
} else {
|
||||
mins[value] = min
|
||||
}
|
||||
|
||||
if ok {
|
||||
timestamps[value] = getTimestampFromPoint(duration, point)
|
||||
} else {
|
||||
timestamps[value] = point.GetTimestamp()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
expectedFieldType := protocol.FieldDefinition_INT32
|
||||
expectedName := "min"
|
||||
var sequenceNumber uint32 = 1
|
||||
|
||||
/* fields := []*protocol.FieldDefinition{} */
|
||||
points := []*protocol.Point{}
|
||||
|
||||
fields := []*protocol.FieldDefinition{
|
||||
&protocol.FieldDefinition{Name: &expectedName, Type: &expectedFieldType},
|
||||
}
|
||||
|
||||
for _, value := range groupBy {
|
||||
if value.IsFunctionCall() {
|
||||
continue
|
||||
}
|
||||
|
||||
tempName := value.Name
|
||||
fields = append(fields, &protocol.FieldDefinition{Name: &tempName, Type: fieldTypes[tempName]})
|
||||
}
|
||||
|
||||
for key, count := range mins {
|
||||
tempKey := key
|
||||
tempCount := count
|
||||
|
||||
timestamp := timestamps[tempKey]
|
||||
|
||||
point := &protocol.Point{
|
||||
Timestamp: ×tamp,
|
||||
SequenceNumber: &sequenceNumber,
|
||||
Values: []*protocol.FieldValue{
|
||||
&protocol.FieldValue{
|
||||
IntValue: &tempCount,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for idx, _ := range groupBy {
|
||||
value := inverse(tempKey, idx)
|
||||
|
||||
switch x := value.(type) {
|
||||
case string:
|
||||
point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x})
|
||||
case int32:
|
||||
point.Values = append(point.Values, &protocol.FieldValue{IntValue: &x})
|
||||
case bool:
|
||||
point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x})
|
||||
case float64:
|
||||
point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x})
|
||||
}
|
||||
}
|
||||
|
||||
points = append(points, point)
|
||||
}
|
||||
|
||||
expectedData := &protocol.Series{
|
||||
Name: &query.GetFromClause().Name,
|
||||
Fields: fields,
|
||||
Points: points,
|
||||
}
|
||||
yield(expectedData)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -703,6 +703,86 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) {
|
|||
|
||||
}
|
||||
|
||||
func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) {
|
||||
// make the mock coordinator return some data
|
||||
engine := createEngine(c, `
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int_value": 3
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346641,
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int_value": 8
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346701,
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int_value": 4
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346721,
|
||||
"sequence_number": 1
|
||||
}
|
||||
],
|
||||
"name": "foo",
|
||||
"fields": [
|
||||
{
|
||||
"type": "STRING",
|
||||
"name": "column_one"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
`)
|
||||
|
||||
runQuery(engine, "select min(column_one) from foo group by time(1m);", c, `[
|
||||
{
|
||||
"points": [
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int_value": 3
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346640,
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int_value": 4
|
||||
}
|
||||
],
|
||||
"timestamp": 1381346700,
|
||||
"sequence_number": 1
|
||||
}
|
||||
],
|
||||
"name": "foo",
|
||||
"fields": [
|
||||
{
|
||||
"type": "INT32",
|
||||
"name": "min"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
`)
|
||||
|
||||
}
|
||||
|
||||
func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) {
|
||||
err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
|
||||
engine := createEngine(c, `[]`)
|
||||
|
|
Loading…
Reference in New Issue