add MERGE support in the engine.

pull/17/head
John Shahid 2013-10-28 15:24:12 -04:00
parent dd3332fa00
commit 23d73f94ed
3 changed files with 214 additions and 2 deletions

View File

@ -34,14 +34,27 @@ func (self *QueryEngine) RunQuery(user common.User, database string, query strin
if err != nil {
return err
}
if isAggregateQuery(q) {
return self.executeCountQueryWithGroupBy(user, database, q, yield)
} else {
self.coordinator.DistributeQuery(user, database, q, yield)
self.distributeQuery(user, database, q, yield)
}
return nil
}
// distribute query and possibly do the merge/join before yielding the points
func (self *QueryEngine) distributeQuery(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) (err error) {
// see if this is a merge query
fromClause := query.GetFromClause()
if fromClause.Type == parser.FromClauseMerge {
yield = getMergeYield(fromClause.Names[0].Name, fromClause.Names[1].Name, yield)
}
return self.coordinator.DistributeQuery(user, database, query, yield)
}
func NewQueryEngine(c coordinator.Coordinator) (EngineI, error) {
return &QueryEngine{c}, nil
}
@ -205,7 +218,7 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database
var inverse InverseMapper
err = self.coordinator.DistributeQuery(user, database, query, func(series *protocol.Series) error {
err = self.distributeQuery(user, database, query, func(series *protocol.Series) error {
var mapper Mapper
mapper, inverse, err = createValuesToInterface(groupBy, series.Fields)
if err != nil {

View File

@ -847,6 +847,80 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
]`)
}
func (self *EngineSuite) TestQueryWithMergedTables(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }
],
"name": "foo",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346705000000, "sequence_number": 1 }
],
"name": "bar",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346707000000, "sequence_number": 1 }
],
"name": "foo",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346706000000, "sequence_number": 1 }
],
"name": "bar",
"fields": ["value"]
},
{
"points": [],
"name": "foo",
"fields": ["value"]
},
{
"points": [],
"name": "bar",
"fields": ["value"]
}
]`)
runQuery(engine, "select * from foo merge bar;", c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }, null], "timestamp": 1381346701000000, "sequence_number": 1 }
],
"name": "foo_merge_bar",
"fields": ["foo.value", "bar.value"]
},
{
"points": [
{ "values": [null, { "int64_value": 2 }], "timestamp": 1381346705000000, "sequence_number": 1 }
],
"name": "foo_merge_bar",
"fields": ["foo.value", "bar.value"]
},
{
"points": [
{ "values": [null, { "int64_value": 4 }], "timestamp": 1381346706000000, "sequence_number": 1 }
],
"name": "foo_merge_bar",
"fields": ["foo.value", "bar.value"]
},
{
"points": [
{ "values": [{ "int64_value": 3 }, null], "timestamp": 1381346707000000, "sequence_number": 1 }
],
"name": "foo_merge_bar",
"fields": ["foo.value", "bar.value"]
}
]`)
}
func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) {
err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
engine := createEngine(c, `[]`)

125
src/engine/merge.go Normal file
View File

@ -0,0 +1,125 @@
package engine
import (
"protocol"
)
func getMergeYield(table1, table2 string, yield func(*protocol.Series) error) func(*protocol.Series) error {
s1 := []*protocol.Series{}
s2 := []*protocol.Series{}
done1 := false
done2 := false
mergedSeriesName := table1 + "_merge_" + table2
var nullValues1 []*protocol.FieldValue
var nullValues2 []*protocol.FieldValue
var fields1 []string
var fields2 []string
return func(p *protocol.Series) error {
current := &s1
other := &s2
currentDone := &done1
otherDone := &done2
currentNullValues := &nullValues1
otherNullValues := &nullValues2
currentFields := &fields1
otherFields := &fields2
if *p.Name == table2 {
current, other = other, current
currentDone, otherDone = otherDone, currentDone
currentNullValues, otherNullValues = otherNullValues, currentNullValues
currentFields, otherFields = otherFields, currentFields
}
// setup the fields
if *currentFields == nil {
for _, f := range p.Fields {
*currentFields = append(*currentFields, *p.Name+"."+f)
}
for i := 0; i < len(p.Fields); i++ {
*currentNullValues = append(*currentNullValues, nil)
}
}
// data for current table is exhausted
if len(p.Points) == 0 {
*currentDone = true
} else {
*current = append(*current, p)
}
// if data for the other table is exhausted then pass through
if *otherDone && len(*other) == 0 {
for _, s := range *current {
for _, p := range s.Points {
p.Values = append(p.Values, *otherNullValues...)
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(*currentFields, *otherFields...),
Points: s.Points,
})
if err != nil {
return err
}
}
*current = nil
}
// if data for the current table is exhausted then pass through
if *currentDone && len(*current) == 0 {
for _, s := range *other {
for _, p := range s.Points {
p.Values = append(p.Values, *currentNullValues...)
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(*otherFields, *currentFields...),
Points: s.Points,
})
if err != nil {
return err
}
}
*other = nil
}
for len(s1) > 0 && len(s2) > 0 && len(s1[0].Points) > 0 && len(s2[0].Points) > 0 {
var points []*protocol.Point
if *s1[0].Points[0].Timestamp > *s2[0].Points[0].Timestamp {
// s1 fields = null, s2 fields = some values
points = s2[0].Points[:1]
for _, p := range points {
p.Values = append(nullValues1, p.Values...)
}
s2[0].Points = s2[0].Points[1:]
if len(s2[0].Points) == 0 {
s2 = s2[1:]
}
} else {
// s1 fields = null, s2 fields = some values
points = s1[0].Points[:1]
for _, p := range points {
p.Values = append(p.Values, nullValues2...)
}
s1[0].Points = s1[0].Points[1:]
if len(s1[0].Points) == 0 {
s1 = s1[1:]
}
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(fields1, fields2...),
Points: points,
})
if err != nil {
return err
}
}
return nil
}
}