From c0ad5d51047b099b2ebe3148f467559c691a7718 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 29 Oct 2014 17:32:31 -0400 Subject: [PATCH 1/3] Rename some constants (from_clause.go,query.yacc,query_types.h): FROM_INNER_JOIN -> FROM_JOIN --- parser/from_clause.go | 2 +- parser/query.yacc | 2 +- parser/query_types.h | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/parser/from_clause.go b/parser/from_clause.go index 915f798185..917a18d2c9 100644 --- a/parser/from_clause.go +++ b/parser/from_clause.go @@ -15,7 +15,7 @@ type FromClauseType int const ( FromClauseArray FromClauseType = C.FROM_ARRAY FromClauseMerge FromClauseType = C.FROM_MERGE - FromClauseInnerJoin FromClauseType = C.FROM_INNER_JOIN + FromClauseInnerJoin FromClauseType = C.FROM_JOIN FromClauseMergeRegex FromClauseType = C.FROM_MERGE_REGEX ) diff --git a/parser/query.yacc b/parser/query.yacc index 9e256df24b..bc327ecf79 100644 --- a/parser/query.yacc +++ b/parser/query.yacc @@ -450,7 +450,7 @@ FROM_CLAUSE: $$->names->elems[1] = malloc(sizeof(table_name)); $$->names->elems[1]->name = $6; $$->names->elems[1]->alias = $7; - $$->from_clause_type = FROM_INNER_JOIN; + $$->from_clause_type = FROM_JOIN; } diff --git a/parser/query_types.h b/parser/query_types.h index 2c69848a13..fbd5ccb6b3 100644 --- a/parser/query_types.h +++ b/parser/query_types.h @@ -72,8 +72,8 @@ typedef struct { enum { FROM_ARRAY, FROM_MERGE, - FROM_INNER_JOIN, - FROM_MERGE_REGEX + FROM_JOIN, + FROM_MERGE_REGEX, } from_clause_type; // in case of merge or join, it's guaranteed that the names array // will have two table names only and they aren't regex. From 3a857296f4eca0c4f8ccf2da50a8c8138b7baf2d Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 29 Oct 2014 18:24:59 -0400 Subject: [PATCH 2/3] Support joining multiple series using regex or list --- coordinator/coordinator.go | 3 + engine/common_merge_engine.go | 6 +- engine/join_engine.go | 101 ++++++++++++++++++++++------------ integration/data_test.go | 35 ++++++++++++ parser/from_clause.go | 1 + parser/parser.go | 2 +- parser/query.yacc | 15 ++++- parser/query_types.h | 1 + parser/rewrite.go | 9 ++- 9 files changed, 132 insertions(+), 41 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index c3f2233bac..5bdcd81d6a 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -242,6 +242,9 @@ func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writ shardIds[i] = s.Id() } writer, err = engine.NewQueryEngine(writer, q, shardIds) + if err != nil { + log.Info("Coordinator processor chain: %s", engine.ProcessorChain(writer)) + } return shards, writer, err } diff --git a/engine/common_merge_engine.go b/engine/common_merge_engine.go index 76c48029f6..409749f45f 100644 --- a/engine/common_merge_engine.go +++ b/engine/common_merge_engine.go @@ -1,6 +1,9 @@ package engine -import "github.com/influxdb/influxdb/protocol" +import ( + "code.google.com/p/log4go" + "github.com/influxdb/influxdb/protocol" +) type CommonMergeEngine struct { merger *Merger @@ -39,6 +42,7 @@ func (cme *CommonMergeEngine) Close() error { } func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { + log4go.Fine("CommonMergeEngine.Yield(): %s", s) stream := cme.streams[s.GetShardId()] stream.Yield(s) return cme.merger.Update() diff --git a/engine/join_engine.go b/engine/join_engine.go index 3779b62f9b..f8f8b73d8d 100644 --- a/engine/join_engine.go +++ b/engine/join_engine.go @@ -1,31 +1,46 @@ package engine import ( + "code.google.com/p/log4go" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" ) +type JoinEngineState struct { + lastFields []string + lastPoint *protocol.Point +} + type JoinEngine struct { - query *parser.SelectQuery - next Processor - table1, table2 string - name string // the output table name - lastPoint1, lastPoint2 *protocol.Point - lastFields1, lastFields2 []string + query *parser.SelectQuery + next Processor + name string // the output table name + tableIdx map[string]int + tableState []JoinEngineState + pts int } func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor { - table1 := query.GetFromClause().Names[0].GetAlias() - table2 := query.GetFromClause().Names[1].GetAlias() - name := table1 + "_join_" + table2 + tableNames := query.GetFromClause().Names + name := query.GetFromClause().GetString() + log4go.Debug("NewJoinEngine: shards=%v, query=%s, next=%s, tableNames=%v, name=%s", + shards, query.GetQueryString(), next.Name(), tableNames, name) joinEngine := &JoinEngine{ - next: next, - name: name, - table1: table1, - table2: table2, - query: query, + next: next, + name: name, + tableState: make([]JoinEngineState, len(tableNames)), + tableIdx: make(map[string]int, len(tableNames)), + query: query, + pts: 0, } + + for i, tn := range tableNames { + alias := tn.GetAlias() + joinEngine.tableState[i] = JoinEngineState{} + joinEngine.tableIdx[alias] = i + } + mergeEngine := NewCommonMergeEngine(shards, false, query.Ascending, joinEngine) return mergeEngine } @@ -39,42 +54,37 @@ func (je *JoinEngine) Close() error { } func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { - if *s.Name == je.table1 { - je.lastPoint1 = s.Points[len(s.Points)-1] - if je.lastFields1 == nil { - for _, f := range s.Fields { - je.lastFields1 = append(je.lastFields1, je.table1+"."+f) - } + log4go.Fine("JoinEngine.Yield(): %s", s) + idx := je.tableIdx[s.GetName()] + state := &je.tableState[idx] + if state.lastPoint == nil { + je.pts++ + } + state.lastPoint = s.Points[len(s.Points)-1] + if state.lastFields == nil { + for _, f := range s.Fields { + state.lastFields = append(state.lastFields, s.GetName()+"."+f) } } - if *s.Name == je.table2 { - je.lastPoint2 = s.Points[len(s.Points)-1] - if je.lastFields2 == nil { - for _, f := range s.Fields { - je.lastFields2 = append(je.lastFields2, je.table2+"."+f) - } - } - } - - if je.lastPoint1 == nil || je.lastPoint2 == nil { + log4go.Fine("JoinEngine: pts = %d", je.pts) + if je.pts != len(je.tableState) { return true, nil } + ts := je.tableState[0].lastPoint.Timestamp newSeries := &protocol.Series{ Name: &je.name, - Fields: append(je.lastFields1, je.lastFields2...), + Fields: je.fields(), Points: []*protocol.Point{ { - Values: append(je.lastPoint1.Values, je.lastPoint2.Values...), - Timestamp: je.lastPoint2.Timestamp, + Timestamp: ts, + Values: je.values(), }, }, } - je.lastPoint1 = nil - je.lastPoint2 = nil - + // reset the number of points available for filteredSeries, err := Filter(je.query, newSeries) if err != nil { return false, err @@ -86,6 +96,25 @@ func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { return true, nil } +func (je *JoinEngine) fields() []string { + fs := []string{} + for _, s := range je.tableState { + fs = append(fs, s.lastFields...) + } + return fs +} + +func (je *JoinEngine) values() []*protocol.FieldValue { + vs := make([]*protocol.FieldValue, 0) + for i := range je.tableState { + s := &je.tableState[i] + vs = append(vs, s.lastPoint.Values...) + s.lastPoint = nil + } + je.pts = 0 + return vs +} + func (self *JoinEngine) Next() Processor { return self.next } diff --git a/integration/data_test.go b/integration/data_test.go index 41b997b402..b70bfc34df 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -549,6 +549,41 @@ func (self *DataTestSuite) TestWhereAndArithmetic(c *C) { c.Assert(maps[0]["expr0"], Equals, 2.0) } +// issue #524 +func (self *DataTestSuite) TestJoinRegex(c *C) { + t1 := time.Now().Truncate(time.Hour).Add(-4 * time.Hour) + t2 := t1.Add(time.Hour) + data := fmt.Sprintf(`[ +{ + "name":"foo1", + "columns":["time", "val"], + "points":[[%d, 1],[%d, 2]] +}, +{ + "name":"foo2", + "columns":["time", "val"], + "points":[[%d, 3],[%d, 4]] + +}, +{ + "name":"foo3", + "columns":["time", "val"], + "points":[[%d, 5],[%d, 6]] + +}]`, t1.Unix(), t2.Unix(), t1.Unix(), t2.Unix(), t1.Unix(), t2.Unix()) + self.client.WriteJsonData(data, c, "s") + serieses := self.client.RunQuery("select * from join(/foo\\d+/)", c, "m") + c.Assert(serieses, HasLen, 1) + maps := ToMap(serieses[0]) + c.Assert(maps, HasLen, 2) + c.Assert(maps[0]["foo1.val"], Equals, 2.0) + c.Assert(maps[0]["foo2.val"], Equals, 4.0) + c.Assert(maps[0]["foo3.val"], Equals, 6.0) + c.Assert(maps[1]["foo1.val"], Equals, 1.0) + c.Assert(maps[1]["foo2.val"], Equals, 3.0) + c.Assert(maps[1]["foo3.val"], Equals, 5.0) +} + // issue #524 func (self *DataTestSuite) TestJoinAndArithmetic(c *C) { t1 := time.Now().Truncate(time.Hour).Add(-4 * time.Hour) diff --git a/parser/from_clause.go b/parser/from_clause.go index 917a18d2c9..f8c4ceeb08 100644 --- a/parser/from_clause.go +++ b/parser/from_clause.go @@ -17,6 +17,7 @@ const ( FromClauseMerge FromClauseType = C.FROM_MERGE FromClauseInnerJoin FromClauseType = C.FROM_JOIN FromClauseMergeRegex FromClauseType = C.FROM_MERGE_REGEX + FromClauseJoinRegex FromClauseType = C.FROM_JOIN_REGEX ) func (self *TableName) GetAlias() string { diff --git a/parser/parser.go b/parser/parser.go index 31830f0dd4..57d4ecb4b6 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -473,7 +473,7 @@ func GetFromClause(fromClause *C.from_clause) (*FromClause, error) { var regex *regexp.Regexp switch t { - case FromClauseMergeRegex: + case FromClauseMergeRegex, FromClauseJoinRegex: val, err := GetValue(fromClause.regex_value) if err != nil { return nil, err diff --git a/parser/query.yacc b/parser/query.yacc index bc327ecf79..5d968fa56f 100644 --- a/parser/query.yacc +++ b/parser/query.yacc @@ -452,7 +452,20 @@ FROM_CLAUSE: $$->names->elems[1]->alias = $7; $$->from_clause_type = FROM_JOIN; } - + | + FROM JOIN '(' SIMPLE_TABLE_VALUES ')' + { + $$ = calloc(1, sizeof(from_clause)); + $$->names = $4; + $$->from_clause_type = FROM_JOIN; + } + | + FROM JOIN '(' REGEX_VALUE ')' + { + $$ = calloc(1, sizeof(from_clause)); + $$->from_clause_type = FROM_JOIN_REGEX; + $$->regex_value = $4; + } WHERE_CLAUSE: WHERE CONDITION diff --git a/parser/query_types.h b/parser/query_types.h index fbd5ccb6b3..eba4f77762 100644 --- a/parser/query_types.h +++ b/parser/query_types.h @@ -74,6 +74,7 @@ typedef struct { FROM_MERGE, FROM_JOIN, FROM_MERGE_REGEX, + FROM_JOIN_REGEX, } from_clause_type; // in case of merge or join, it's guaranteed that the names array // will have two table names only and they aren't regex. diff --git a/parser/rewrite.go b/parser/rewrite.go index b98dab396f..ad175f5f35 100644 --- a/parser/rewrite.go +++ b/parser/rewrite.go @@ -12,13 +12,18 @@ type RegexMatcher func(r *regexp.Regexp) []string // the query will be rewritten to // select * from merge(foobar, foobaz) func RewriteMergeQuery(query *SelectQuery, rm RegexMatcher) { - if query.FromClause.Type != FromClauseMergeRegex { + resultFromClauseType := FromClauseMerge + switch query.FromClause.Type { + case FromClauseMergeRegex: + case FromClauseJoinRegex: + resultFromClauseType = FromClauseInnerJoin + default: return } series := rm(query.FromClause.Regex) f := query.FromClause - f.Type = FromClauseMerge + f.Type = resultFromClauseType f.Regex = nil for _, s := range series { f.Names = append(f.Names, &TableName{ From edd22ce3b8a3cb1d565b905dbb233c44cf7da4b5 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 31 Oct 2014 15:17:45 -0400 Subject: [PATCH 3/3] Add some docs --- engine/join_engine.go | 79 +++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/engine/join_engine.go b/engine/join_engine.go index f8f8b73d8d..15593ef28e 100644 --- a/engine/join_engine.go +++ b/engine/join_engine.go @@ -6,20 +6,18 @@ import ( "github.com/influxdb/influxdb/protocol" ) -type JoinEngineState struct { - lastFields []string - lastPoint *protocol.Point -} - +// TODO: Explain how JoinEngine work type JoinEngine struct { - query *parser.SelectQuery - next Processor - name string // the output table name - tableIdx map[string]int - tableState []JoinEngineState - pts int + query *parser.SelectQuery + next Processor + name string // the output table name + tableIdx map[string]int + tablesState []joinEngineState + pts int } +// Create and return a new JoinEngine given the shards that will be +// processed and the query func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor { tableNames := query.GetFromClause().Names name := query.GetFromClause().GetString() @@ -27,17 +25,17 @@ func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) P shards, query.GetQueryString(), next.Name(), tableNames, name) joinEngine := &JoinEngine{ - next: next, - name: name, - tableState: make([]JoinEngineState, len(tableNames)), - tableIdx: make(map[string]int, len(tableNames)), - query: query, - pts: 0, + next: next, + name: name, + tablesState: make([]joinEngineState, len(tableNames)), + tableIdx: make(map[string]int, len(tableNames)), + query: query, + pts: 0, } for i, tn := range tableNames { alias := tn.GetAlias() - joinEngine.tableState[i] = JoinEngineState{} + joinEngine.tablesState[i] = joinEngineState{} joinEngine.tableIdx[alias] = i } @@ -56,11 +54,16 @@ func (je *JoinEngine) Close() error { func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { log4go.Fine("JoinEngine.Yield(): %s", s) idx := je.tableIdx[s.GetName()] - state := &je.tableState[idx] + state := &je.tablesState[idx] + // If the state for this table didn't contain a point already, + // increment the number of tables ready to emit a point by + // incrementing `pts` if state.lastPoint == nil { je.pts++ } state.lastPoint = s.Points[len(s.Points)-1] + // update the fields for this table. the fields shouldn't change + // after the first point, so we only need to set them once if state.lastFields == nil { for _, f := range s.Fields { state.lastFields = append(state.lastFields, s.GetName()+"."+f) @@ -68,11 +71,16 @@ func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { } log4go.Fine("JoinEngine: pts = %d", je.pts) - if je.pts != len(je.tableState) { + // if the number of tables ready to emit a point isn't equal to the + // total number of tables being joined, then return + if je.pts != len(je.tablesState) { return true, nil } - ts := je.tableState[0].lastPoint.Timestamp + // we arbitrarily use the timestamp of the first table's point as + // the timestamp of the resulting point. may be we should use the + // smalles (or largest) timestamp. + ts := je.tablesState[0].lastPoint.Timestamp newSeries := &protocol.Series{ Name: &je.name, Fields: je.fields(), @@ -84,7 +92,9 @@ func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { }, } - // reset the number of points available for + // filter the point. the user may have a where clause with the join, + // e.g. `select * from join(foo1, foo2) where foo1.val > 10`. we + // can't evaluate the where clause until after join happens filteredSeries, err := Filter(je.query, newSeries) if err != nil { return false, err @@ -96,25 +106,36 @@ func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { return true, nil } +func (self *JoinEngine) Next() Processor { + return self.next +} + +// private + +type joinEngineState struct { + lastFields []string + lastPoint *protocol.Point +} + +// Returns the field names from all tables appended together func (je *JoinEngine) fields() []string { fs := []string{} - for _, s := range je.tableState { + for _, s := range je.tablesState { fs = append(fs, s.lastFields...) } return fs } +// Returns the field values from all tables appended together func (je *JoinEngine) values() []*protocol.FieldValue { vs := make([]*protocol.FieldValue, 0) - for i := range je.tableState { - s := &je.tableState[i] + for i := range je.tablesState { + // Take the address of the slice element, since we set lastPoint + // to nil + s := &je.tablesState[i] vs = append(vs, s.lastPoint.Values...) s.lastPoint = nil } je.pts = 0 return vs } - -func (self *JoinEngine) Next() Processor { - return self.next -}