working implementation of INNER JOIN.

pull/17/head
John Shahid 2013-10-29 14:22:11 -04:00
parent 8cdb3db880
commit 1287f1eb5a
3 changed files with 235 additions and 99 deletions

View File

@ -51,6 +51,10 @@ func (self *QueryEngine) distributeQuery(user common.User, database string, quer
yield = getMergeYield(fromClause.Names[0].Name, fromClause.Names[1].Name, yield)
}
if fromClause.Type == parser.FromClauseInnerJoin {
yield = getJoinYield(fromClause.Names[0].Name, fromClause.Names[1].Name, yield)
}
return self.coordinator.DistributeQuery(user, database, query, yield)
}

View File

@ -929,6 +929,66 @@ func (self *EngineSuite) TestQueryWithMergedTables(c *C) {
]`)
}
func (self *EngineSuite) TestQueryWithJoinedTables(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }
],
"name": "foo",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346705000000, "sequence_number": 1 }
],
"name": "bar",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346706000000, "sequence_number": 1 }
],
"name": "foo",
"fields": ["value"]
},
{
"points": [
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346707000000, "sequence_number": 1 }
],
"name": "bar",
"fields": ["value"]
},
{
"points": [],
"name": "foo",
"fields": ["value"]
},
{
"points": [],
"name": "bar",
"fields": ["value"]
}
]`)
runQuery(engine, "select * from foo inner join bar;", c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }, { "int64_value": 2 }], "timestamp": 1381346705000000, "sequence_number": 1 }
],
"name": "foo_join_bar",
"fields": ["foo.value", "bar.value"]
},
{
"points": [
{ "values": [{ "int64_value": 3 }, { "int64_value": 4 }], "timestamp": 1381346707000000, "sequence_number": 1 }
],
"name": "foo_join_bar",
"fields": ["foo.value", "bar.value"]
}
]`)
}
func (self *EngineSuite) TestQueryWithMergedTablesWithPointsAppend(c *C) {
engine := createEngine(c, `[
{

View File

@ -1,119 +1,191 @@
package engine
import (
"fmt"
"protocol"
)
func getJoinYield(table1, table2 string, yield func(*protocol.Series) error) func(*protocol.Series) error {
var lastPoint1 *protocol.Point
var lastPoint2 *protocol.Point
name := table1 + "_join_" + table2
return mergeYield(table1, table2, func(s *protocol.Series) error {
if *s.Name == table1 {
lastPoint1 = s.Points[len(s.Points)-1]
fmt.Printf("values 1: %#v\n", lastPoint1.Values)
}
if *s.Name == table2 {
lastPoint2 = s.Points[len(s.Points)-1]
fmt.Printf("values 2: %#v\n", lastPoint2.Values)
}
if lastPoint1 == nil || lastPoint2 == nil {
return nil
}
// join the last two points and yield them
newValues := []*protocol.FieldValue{}
for idx, value := range lastPoint1.Values {
if value == nil {
value = lastPoint2.Values[idx]
}
newValues = append(newValues, value)
}
newSeries := &protocol.Series{
Name: &name,
Fields: s.Fields,
Points: []*protocol.Point{
&protocol.Point{
Values: newValues,
Timestamp: lastPoint2.Timestamp,
SequenceNumber: lastPoint2.SequenceNumber,
},
},
}
lastPoint1 = nil
lastPoint2 = nil
return yield(newSeries)
})
}
func getMergeYield(table1, table2 string, yield func(*protocol.Series) error) func(*protocol.Series) error {
s1 := []*protocol.Series{}
s2 := []*protocol.Series{}
done1 := false
done2 := false
mergedSeriesName := table1 + "_merge_" + table2
var nullValues1 []*protocol.FieldValue
var nullValues2 []*protocol.FieldValue
var fields1 []string
var fields2 []string
name := table1 + "_merge_" + table2
return mergeYield(table1, table2, func(s *protocol.Series) error {
s.Name = &name
return yield(s)
})
}
type MergeState struct {
name string
series []*protocol.Series
fields []string
nullValues []*protocol.FieldValue
isLeft bool
done bool
}
func (self *MergeState) hasPoints() bool {
return len(self.series) > 0 && len(self.series[0].Points) > 0
}
func (self *MergeState) isEarlier(other *MergeState) bool {
return *self.series[0].Points[0].Timestamp < *other.series[0].Points[0].Timestamp
}
func (self *MergeState) mergeValues(other *MergeState, p *protocol.Point) {
if self.isLeft {
p.Values = append(p.Values, other.nullValues...)
return
}
p.Values = append(other.nullValues, p.Values...)
}
func (self *MergeState) flushIfOtherStateIsEmpty(other *MergeState, fields []string, yield func(*protocol.Series) error) error {
if other.done && len(other.series) == 0 {
for _, s := range self.series {
for _, p := range s.Points {
self.mergeValues(other, p)
}
err := yield(&protocol.Series{
Name: s.Name,
Fields: fields,
Points: s.Points,
})
if err != nil {
return err
}
}
self.series = nil
}
return nil
}
// update the state, the points belong to this MergeState (i.e. the name of the timeseries matches)
func (self *MergeState) updateState(p *protocol.Series) {
if *p.Name != self.name {
return
}
// setup the fields
if self.fields == nil {
for _, f := range p.Fields {
self.fields = append(self.fields, self.name+"."+f)
}
for i := 0; i < len(p.Fields); i++ {
self.nullValues = append(self.nullValues, nil)
}
}
// data for current table is exhausted
if len(p.Points) == 0 {
self.done = true
} else {
self.series = append(self.series, p)
}
}
// returns a yield function that will sort points from table1 and
// table2 no matter what the order in which they are received.
//
// FIXME: This won't work for queries that are executed in descending order.
func mergeYield(table1, table2 string, yield func(*protocol.Series) error) func(*protocol.Series) error {
state1 := &MergeState{
name: table1,
isLeft: true,
}
state2 := &MergeState{
name: table2,
isLeft: false,
}
return func(p *protocol.Series) error {
current := &s1
other := &s2
currentDone := &done1
otherDone := &done2
currentNullValues := &nullValues1
otherNullValues := &nullValues2
currentFields := &fields1
otherFields := &fields2
state1.updateState(p)
state2.updateState(p)
if *p.Name == table2 {
current, other = other, current
currentDone, otherDone = otherDone, currentDone
currentNullValues, otherNullValues = otherNullValues, currentNullValues
currentFields, otherFields = otherFields, currentFields
mergedFields := append(state1.fields, state2.fields...)
// if one of the states (let's call it s1) is done (i.e. we'll
// receive no more points for that series) then we know that we
// won't get any points for s1 that are older than what's in s2 so
// we can safely flush all s2's points.
if err := state1.flushIfOtherStateIsEmpty(state2, mergedFields, yield); err != nil {
return err
}
if err := state2.flushIfOtherStateIsEmpty(state1, mergedFields, yield); err != nil {
return err
}
// setup the fields
if *currentFields == nil {
for _, f := range p.Fields {
*currentFields = append(*currentFields, *p.Name+"."+f)
}
for i := 0; i < len(p.Fields); i++ {
*currentNullValues = append(*currentNullValues, nil)
}
}
// data for current table is exhausted
if len(p.Points) == 0 {
*currentDone = true
} else {
*current = append(*current, p)
}
// if data for the other table is exhausted then pass through
if *otherDone && len(*other) == 0 {
for _, s := range *current {
for _, p := range s.Points {
p.Values = append(p.Values, *otherNullValues...)
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(*currentFields, *otherFields...),
Points: s.Points,
})
if err != nil {
return err
}
}
*current = nil
}
// if data for the current table is exhausted then pass through
if *currentDone && len(*current) == 0 {
for _, s := range *other {
for _, p := range s.Points {
p.Values = append(p.Values, *currentNullValues...)
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(*otherFields, *currentFields...),
Points: s.Points,
})
if err != nil {
return err
}
}
*other = nil
}
for len(s1) > 0 && len(s2) > 0 && len(s1[0].Points) > 0 && len(s2[0].Points) > 0 {
for state1.hasPoints() && state2.hasPoints() {
var points []*protocol.Point
if *s1[0].Points[0].Timestamp > *s2[0].Points[0].Timestamp {
// s1 fields = null, s2 fields = some values
points = []*protocol.Point{s2[0].Points[0]}
for _, p := range points {
p.Values = append(nullValues1, p.Values...)
}
s2[0].Points = s2[0].Points[1:]
if len(s2[0].Points) == 0 {
s2 = s2[1:]
}
state := state1
otherState := state2
if state2.isEarlier(state1) {
state = state2
otherState = state1
}
points = []*protocol.Point{state.series[0].Points[0]}
for _, p := range points {
state.mergeValues(otherState, p)
}
// get rid of that point, or get rid of the entire series
// if this is the last point
if len(state.series[0].Points) == 1 {
state.series = state.series[1:]
} else {
// s1 fields = null, s2 fields = some values
points = []*protocol.Point{s1[0].Points[0]}
for _, p := range points {
p.Values = append(p.Values, nullValues2...)
}
s1[0].Points = s1[0].Points[1:]
if len(s1[0].Points) == 0 {
s1 = s1[1:]
}
state.series[0].Points = state.series[0].Points[1:]
}
err := yield(&protocol.Series{
Name: &mergedSeriesName,
Fields: append(fields1, fields2...),
Name: &state.name,
Fields: mergedFields,
Points: points,
})
if err != nil {