get rid of executeCountQuery and assume that points belong to the same group if there was no group by clause.

pull/17/head
John Shahid 2013-10-11 14:34:06 -04:00
parent 117a78d57a
commit b7202539b6
1 changed files with 67 additions and 109 deletions

View File

@ -11,6 +11,8 @@ import (
"time"
)
const ALL_GROUP_IDENTIFIER = 1
type QueryEngine struct {
coordinator coordinator.Coordinator
}
@ -22,23 +24,13 @@ func (self *QueryEngine) RunQuery(query *parser.Query, yield func(*protocol.Seri
fmt.Fprintf(os.Stderr, "********************************BUG********************************\n")
buf := make([]byte, 1024)
n := runtime.Stack(buf, false)
fmt.Fprintf(os.Stderr, "Stacktrace: %s\n", string(buf[:n]))
fmt.Fprintf(os.Stderr, "Error: %s. Stacktrace: %s\n", err, string(buf[:n]))
err = common.NewQueryError(common.InternalError, "Internal Error")
}
}()
if isCountQuery(query) {
if groupBy := query.GetGroupByClause(); len(groupBy) > 0 {
return self.executeCountQueryWithGroupBy(query, yield)
} else {
return self.executeCountQuery(query, yield)
}
} else if isFunctionQuery(query) {
if groupBy := query.GetGroupByClause(); len(groupBy) > 0 {
return self.executeFunctionQueryWithGroupBy(query, yield)
} else {
// return self.executeMinQuery(query, yield)
}
return self.executeCountQueryWithGroupBy(query, yield)
} else {
self.coordinator.DistributeQuery(query, yield)
}
@ -59,60 +51,6 @@ func isCountQuery(query *parser.Query) bool {
return false
}
func isFunctionQuery(query *parser.Query) bool {
for _, column := range query.GetColumnNames() {
if column.IsFunctionCall() && (column.Name == "min" || column.Name == "max") {
return true
}
}
return false
}
func (self *QueryEngine) executeCountQuery(query *parser.Query, yield func(*protocol.Series) error) error {
count := make(map[string]int32)
var timestamp int64 = 0
self.coordinator.DistributeQuery(query, func(series *protocol.Series) error {
c := count[*series.Name]
c += int32(len(series.Points))
count[*series.Name] = c
if len(series.Points) > 0 {
timestamp = series.Points[0].GetTimestamp()
}
return nil
})
expectedFieldType := protocol.FieldDefinition_INT32
expectedName := "count"
var sequenceNumber uint32 = 1
for name, value := range count {
tempValue := value
tempName := name
expectedData := &protocol.Series{
Name: &tempName,
Fields: []*protocol.FieldDefinition{
&protocol.FieldDefinition{Name: &expectedName, Type: &expectedFieldType},
},
Points: []*protocol.Point{
&protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{
&protocol.FieldValue{
IntValue: &tempValue,
},
},
},
},
}
yield(expectedData)
}
return nil
}
func getValueFromPoint(value *protocol.FieldValue, fType protocol.FieldDefinition_Type) interface{} {
switch fType {
case protocol.FieldDefinition_STRING:
@ -154,11 +92,20 @@ func createValuesToInterface(groupBy parser.GroupByClause, definitions []*protoc
switch len(names) {
case 0:
if window != nil {
// this must be group by time
return func(p *protocol.Point) interface{} {
return getTimestampFromPoint(*window, p)
}, func(i interface{}, idx int) interface{} {
return i
}, nil
}
// this must be group by time
return func(p *protocol.Point) interface{} {
return getTimestampFromPoint(*window, p)
return ALL_GROUP_IDENTIFIER
}, func(i interface{}, idx int) interface{} {
return i
panic("This should never be called")
}, nil
case 1:
@ -243,8 +190,8 @@ func createValuesToInterface(groupBy parser.GroupByClause, definitions []*protoc
}
func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield func(*protocol.Series) error) error {
counts := make(map[interface{}]int32)
timestamps := make(map[interface{}]int64)
counts := make(map[string]map[interface{}]int32)
timestamps := make(map[string]map[interface{}]int64)
groupBy := query.GetGroupByClause()
@ -269,13 +216,22 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield
for _, point := range series.Points {
value := mapper(point)
c := counts[value]
counts[value] = c + 1
tableCounts := counts[*series.Name]
if tableCounts == nil {
tableCounts = make(map[interface{}]int32)
counts[*series.Name] = tableCounts
}
tableCounts[value]++
tableTimestamps := timestamps[*series.Name]
if tableTimestamps == nil {
tableTimestamps = make(map[interface{}]int64)
timestamps[*series.Name] = tableTimestamps
}
if duration != nil {
timestamps[value] = getTimestampFromPoint(*duration, point)
tableTimestamps[value] = getTimestampFromPoint(*duration, point)
} else {
timestamps[value] = point.GetTimestamp()
tableTimestamps[value] = point.GetTimestamp()
}
}
@ -291,8 +247,6 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield
var sequenceNumber uint32 = 1
/* fields := []*protocol.FieldDefinition{} */
points := []*protocol.Point{}
fields := []*protocol.FieldDefinition{
&protocol.FieldDefinition{Name: &expectedName, Type: &expectedFieldType},
}
@ -306,46 +260,50 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.Query, yield
fields = append(fields, &protocol.FieldDefinition{Name: &tempName, Type: fieldTypes[tempName]})
}
for key, count := range counts {
tempKey := key
tempCount := count
for table, tableCounts := range counts {
tempTable := table
points := []*protocol.Point{}
for key, count := range tableCounts {
tempKey := key
tempCount := count
timestamp := timestamps[tempKey]
timestamp := timestamps[table][tempKey]
point := &protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{
&protocol.FieldValue{
IntValue: &tempCount,
point := &protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{
&protocol.FieldValue{
IntValue: &tempCount,
},
},
},
}
for idx, _ := range groupBy {
value := inverse(tempKey, idx)
switch x := value.(type) {
case string:
point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x})
case int32:
point.Values = append(point.Values, &protocol.FieldValue{IntValue: &x})
case bool:
point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x})
case float64:
point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x})
}
for idx, _ := range groupBy {
value := inverse(tempKey, idx)
switch x := value.(type) {
case string:
point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x})
case int32:
point.Values = append(point.Values, &protocol.FieldValue{IntValue: &x})
case bool:
point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x})
case float64:
point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x})
}
}
points = append(points, point)
}
points = append(points, point)
expectedData := &protocol.Series{
Name: &tempTable,
Fields: fields,
Points: points,
}
yield(expectedData)
}
expectedData := &protocol.Series{
Name: &query.GetFromClause().Name,
Fields: fields,
Points: points,
}
yield(expectedData)
return nil
}