Merge remote-tracking branch 'refs/remotes/origin/pr/973'

pull/671/head
John Shahid 2014-11-03 14:04:58 -05:00
commit 63b11691eb
10 changed files with 159 additions and 45 deletions

View File

@ -4,6 +4,8 @@
- [Issue #1068](https://github.com/influxdb/influxdb/issues/1068). Print - [Issue #1068](https://github.com/influxdb/influxdb/issues/1068). Print
the processor chain when the query is started the processor chain when the query is started
- [Issue #973](https://github.com/influxdb/influxdb/issues/973). Support joining
using a regex or list of time series
### Bugfixes ### Bugfixes

View File

@ -242,6 +242,9 @@ func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writ
shardIds[i] = s.Id() shardIds[i] = s.Id()
} }
writer, err = engine.NewQueryEngine(writer, q, shardIds) writer, err = engine.NewQueryEngine(writer, q, shardIds)
if err != nil {
log.Info("Coordinator processor chain: %s", engine.ProcessorChain(writer))
}
return shards, writer, err return shards, writer, err
} }

View File

@ -1,6 +1,9 @@
package engine package engine
import "github.com/influxdb/influxdb/protocol" import (
"code.google.com/p/log4go"
"github.com/influxdb/influxdb/protocol"
)
type CommonMergeEngine struct { type CommonMergeEngine struct {
merger *Merger merger *Merger
@ -39,6 +42,7 @@ func (cme *CommonMergeEngine) Close() error {
} }
func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) {
log4go.Fine("CommonMergeEngine.Yield(): %s", s)
stream := cme.streams[s.GetShardId()] stream := cme.streams[s.GetShardId()]
stream.Yield(s) stream.Yield(s)
return cme.merger.Update() return cme.merger.Update()

View File

@ -1,31 +1,44 @@
package engine package engine
import ( import (
"code.google.com/p/log4go"
"github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol" "github.com/influxdb/influxdb/protocol"
) )
// TODO: Explain how JoinEngine work
type JoinEngine struct { type JoinEngine struct {
query *parser.SelectQuery query *parser.SelectQuery
next Processor next Processor
table1, table2 string name string // the output table name
name string // the output table name tableIdx map[string]int
lastPoint1, lastPoint2 *protocol.Point tablesState []joinEngineState
lastFields1, lastFields2 []string pts int
} }
// Create and return a new JoinEngine given the shards that will be
// processed and the query
func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor { func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor {
table1 := query.GetFromClause().Names[0].GetAlias() tableNames := query.GetFromClause().Names
table2 := query.GetFromClause().Names[1].GetAlias() name := query.GetFromClause().GetString()
name := table1 + "_join_" + table2 log4go.Debug("NewJoinEngine: shards=%v, query=%s, next=%s, tableNames=%v, name=%s",
shards, query.GetQueryString(), next.Name(), tableNames, name)
joinEngine := &JoinEngine{ joinEngine := &JoinEngine{
next: next, next: next,
name: name, name: name,
table1: table1, tablesState: make([]joinEngineState, len(tableNames)),
table2: table2, tableIdx: make(map[string]int, len(tableNames)),
query: query, query: query,
pts: 0,
} }
for i, tn := range tableNames {
alias := tn.GetAlias()
joinEngine.tablesState[i] = joinEngineState{}
joinEngine.tableIdx[alias] = i
}
mergeEngine := NewCommonMergeEngine(shards, false, query.Ascending, joinEngine) mergeEngine := NewCommonMergeEngine(shards, false, query.Ascending, joinEngine)
return mergeEngine return mergeEngine
} }
@ -39,42 +52,49 @@ func (je *JoinEngine) Close() error {
} }
func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) {
if *s.Name == je.table1 { log4go.Fine("JoinEngine.Yield(): %s", s)
je.lastPoint1 = s.Points[len(s.Points)-1] idx := je.tableIdx[s.GetName()]
if je.lastFields1 == nil { state := &je.tablesState[idx]
for _, f := range s.Fields { // If the state for this table didn't contain a point already,
je.lastFields1 = append(je.lastFields1, je.table1+"."+f) // increment the number of tables ready to emit a point by
} // incrementing `pts`
if state.lastPoint == nil {
je.pts++
}
state.lastPoint = s.Points[len(s.Points)-1]
// update the fields for this table. the fields shouldn't change
// after the first point, so we only need to set them once
if state.lastFields == nil {
for _, f := range s.Fields {
state.lastFields = append(state.lastFields, s.GetName()+"."+f)
} }
} }
if *s.Name == je.table2 { log4go.Fine("JoinEngine: pts = %d", je.pts)
je.lastPoint2 = s.Points[len(s.Points)-1] // if the number of tables ready to emit a point isn't equal to the
if je.lastFields2 == nil { // total number of tables being joined, then return
for _, f := range s.Fields { if je.pts != len(je.tablesState) {
je.lastFields2 = append(je.lastFields2, je.table2+"."+f)
}
}
}
if je.lastPoint1 == nil || je.lastPoint2 == nil {
return true, nil return true, nil
} }
// we arbitrarily use the timestamp of the first table's point as
// the timestamp of the resulting point. may be we should use the
// smalles (or largest) timestamp.
ts := je.tablesState[0].lastPoint.Timestamp
newSeries := &protocol.Series{ newSeries := &protocol.Series{
Name: &je.name, Name: &je.name,
Fields: append(je.lastFields1, je.lastFields2...), Fields: je.fields(),
Points: []*protocol.Point{ Points: []*protocol.Point{
{ {
Values: append(je.lastPoint1.Values, je.lastPoint2.Values...), Timestamp: ts,
Timestamp: je.lastPoint2.Timestamp, Values: je.values(),
}, },
}, },
} }
je.lastPoint1 = nil // filter the point. the user may have a where clause with the join,
je.lastPoint2 = nil // e.g. `select * from join(foo1, foo2) where foo1.val > 10`. we
// can't evaluate the where clause until after join happens
filteredSeries, err := Filter(je.query, newSeries) filteredSeries, err := Filter(je.query, newSeries)
if err != nil { if err != nil {
return false, err return false, err
@ -89,3 +109,33 @@ func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) {
func (self *JoinEngine) Next() Processor { func (self *JoinEngine) Next() Processor {
return self.next return self.next
} }
// private
type joinEngineState struct {
lastFields []string
lastPoint *protocol.Point
}
// Returns the field names from all tables appended together
func (je *JoinEngine) fields() []string {
fs := []string{}
for _, s := range je.tablesState {
fs = append(fs, s.lastFields...)
}
return fs
}
// Returns the field values from all tables appended together
func (je *JoinEngine) values() []*protocol.FieldValue {
vs := make([]*protocol.FieldValue, 0)
for i := range je.tablesState {
// Take the address of the slice element, since we set lastPoint
// to nil
s := &je.tablesState[i]
vs = append(vs, s.lastPoint.Values...)
s.lastPoint = nil
}
je.pts = 0
return vs
}

View File

@ -549,6 +549,41 @@ func (self *DataTestSuite) TestWhereAndArithmetic(c *C) {
c.Assert(maps[0]["expr0"], Equals, 2.0) c.Assert(maps[0]["expr0"], Equals, 2.0)
} }
// issue #524
func (self *DataTestSuite) TestJoinRegex(c *C) {
t1 := time.Now().Truncate(time.Hour).Add(-4 * time.Hour)
t2 := t1.Add(time.Hour)
data := fmt.Sprintf(`[
{
"name":"foo1",
"columns":["time", "val"],
"points":[[%d, 1],[%d, 2]]
},
{
"name":"foo2",
"columns":["time", "val"],
"points":[[%d, 3],[%d, 4]]
},
{
"name":"foo3",
"columns":["time", "val"],
"points":[[%d, 5],[%d, 6]]
}]`, t1.Unix(), t2.Unix(), t1.Unix(), t2.Unix(), t1.Unix(), t2.Unix())
self.client.WriteJsonData(data, c, "s")
serieses := self.client.RunQuery("select * from join(/foo\\d+/)", c, "m")
c.Assert(serieses, HasLen, 1)
maps := ToMap(serieses[0])
c.Assert(maps, HasLen, 2)
c.Assert(maps[0]["foo1.val"], Equals, 2.0)
c.Assert(maps[0]["foo2.val"], Equals, 4.0)
c.Assert(maps[0]["foo3.val"], Equals, 6.0)
c.Assert(maps[1]["foo1.val"], Equals, 1.0)
c.Assert(maps[1]["foo2.val"], Equals, 3.0)
c.Assert(maps[1]["foo3.val"], Equals, 5.0)
}
// issue #524 // issue #524
func (self *DataTestSuite) TestJoinAndArithmetic(c *C) { func (self *DataTestSuite) TestJoinAndArithmetic(c *C) {
t1 := time.Now().Truncate(time.Hour).Add(-4 * time.Hour) t1 := time.Now().Truncate(time.Hour).Add(-4 * time.Hour)

View File

@ -15,8 +15,9 @@ type FromClauseType int
const ( const (
FromClauseArray FromClauseType = C.FROM_ARRAY FromClauseArray FromClauseType = C.FROM_ARRAY
FromClauseMerge FromClauseType = C.FROM_MERGE FromClauseMerge FromClauseType = C.FROM_MERGE
FromClauseInnerJoin FromClauseType = C.FROM_INNER_JOIN FromClauseInnerJoin FromClauseType = C.FROM_JOIN
FromClauseMergeRegex FromClauseType = C.FROM_MERGE_REGEX FromClauseMergeRegex FromClauseType = C.FROM_MERGE_REGEX
FromClauseJoinRegex FromClauseType = C.FROM_JOIN_REGEX
) )
func (self *TableName) GetAlias() string { func (self *TableName) GetAlias() string {

View File

@ -473,7 +473,7 @@ func GetFromClause(fromClause *C.from_clause) (*FromClause, error) {
var regex *regexp.Regexp var regex *regexp.Regexp
switch t { switch t {
case FromClauseMergeRegex: case FromClauseMergeRegex, FromClauseJoinRegex:
val, err := GetValue(fromClause.regex_value) val, err := GetValue(fromClause.regex_value)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -450,9 +450,22 @@ FROM_CLAUSE:
$$->names->elems[1] = malloc(sizeof(table_name)); $$->names->elems[1] = malloc(sizeof(table_name));
$$->names->elems[1]->name = $6; $$->names->elems[1]->name = $6;
$$->names->elems[1]->alias = $7; $$->names->elems[1]->alias = $7;
$$->from_clause_type = FROM_INNER_JOIN; $$->from_clause_type = FROM_JOIN;
}
|
FROM JOIN '(' SIMPLE_TABLE_VALUES ')'
{
$$ = calloc(1, sizeof(from_clause));
$$->names = $4;
$$->from_clause_type = FROM_JOIN;
}
|
FROM JOIN '(' REGEX_VALUE ')'
{
$$ = calloc(1, sizeof(from_clause));
$$->from_clause_type = FROM_JOIN_REGEX;
$$->regex_value = $4;
} }
WHERE_CLAUSE: WHERE_CLAUSE:
WHERE CONDITION WHERE CONDITION

View File

@ -72,8 +72,9 @@ typedef struct {
enum { enum {
FROM_ARRAY, FROM_ARRAY,
FROM_MERGE, FROM_MERGE,
FROM_INNER_JOIN, FROM_JOIN,
FROM_MERGE_REGEX FROM_MERGE_REGEX,
FROM_JOIN_REGEX,
} from_clause_type; } from_clause_type;
// in case of merge or join, it's guaranteed that the names array // in case of merge or join, it's guaranteed that the names array
// will have two table names only and they aren't regex. // will have two table names only and they aren't regex.

View File

@ -12,13 +12,18 @@ type RegexMatcher func(r *regexp.Regexp) []string
// the query will be rewritten to // the query will be rewritten to
// select * from merge(foobar, foobaz) // select * from merge(foobar, foobaz)
func RewriteMergeQuery(query *SelectQuery, rm RegexMatcher) { func RewriteMergeQuery(query *SelectQuery, rm RegexMatcher) {
if query.FromClause.Type != FromClauseMergeRegex { resultFromClauseType := FromClauseMerge
switch query.FromClause.Type {
case FromClauseMergeRegex:
case FromClauseJoinRegex:
resultFromClauseType = FromClauseInnerJoin
default:
return return
} }
series := rm(query.FromClause.Regex) series := rm(query.FromClause.Regex)
f := query.FromClause f := query.FromClause
f.Type = FromClauseMerge f.Type = resultFromClauseType
f.Regex = nil f.Regex = nil
for _, s := range series { for _, s := range series {
f.Names = append(f.Names, &TableName{ f.Names = append(f.Names, &TableName{