From 4da21ddc32e3f0d402aced7c99d9e433728b133a Mon Sep 17 00:00:00 2001 From: John Shahid Date: Mon, 10 Mar 2014 17:18:40 -0400 Subject: [PATCH 1/6] add some explain queries tests --- src/integration/benchmark_test.go | 171 ++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index 2f8a75a0f1..9d14697c27 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -298,6 +298,177 @@ func (self *IntegrationSuite) TestShortPasswords(c *C) { c.Assert(resp.StatusCode, Equals, http.StatusOK) } +func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3] + ], + "name": "test_explain_passthrough", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(2.0)) + c.Assert(series[0].Points[0][6], Equals, float64(2.0)) +} + +func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) { + points := []string{} + for i := 0; i < 101; i++ { + points = append(points, fmt.Sprintf(`["val1", %d]`, i)) + } + + data := fmt.Sprintf(` + [{ + "points": [%s], + "name": "test_explain_passthrough_limit", + "columns": ["val_1", "val_2"] + }]`, strings.Join(points, ",")) + + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough_limit where time > now() - 1h limit 1", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + + // we can read at most point-batch-size points, which is set to 100 + // by default + c.Assert(series[0].Points[0][5], Equals, float64(100.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithNonLocalAggregator(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_explain_non_local", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from test_explain_non_local where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithNonLocalAggregatorAndRegex(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_explain_non_local_regex", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_explain_non_local_regex.*/ where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithLocalAggregator(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from test_local_aggregator group by time(1h) where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithLocalAggregatorAndRegex(c *C) { + data := ` +[ + { + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator_regex_1", + "columns": ["val_1", "val_2"] + }, + { + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator_regex_2", + "columns": ["val_1", "val_2"] + } +]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_local_aggregator_regex.*/ group by time(1h) where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(6.0)) + c.Assert(series[0].Points[0][6], Equals, float64(2.0)) +} + func (self *IntegrationSuite) TestMedians(c *C) { for i := 0; i < 3; i++ { err := self.server.WriteData(fmt.Sprintf(` From e2adcf1c581dc5b6074901d11656887ffca2cdb1 Mon Sep 17 00:00:00 2001 From: Pawel Szymanski Date: Sat, 8 Mar 2014 19:28:54 -0800 Subject: [PATCH 2/6] Close #318. Support EXPLAIN queries --- CHANGELOG.md | 2 + src/cluster/shard.go | 13 +-- src/coordinator/coordinator.go | 43 ++++++--- src/coordinator/protobuf_request_handler.go | 3 +- src/engine/aggregator.go | 14 ++- src/engine/engine.go | 102 ++++++++++++++++++-- src/engine/limiter.go | 10 +- src/engine/list_series_engine.go | 11 ++- src/engine/passthrough_engine.go | 66 ++++++++----- src/parser/parser.go | 10 ++ src/parser/parser_test.go | 17 ++++ src/parser/query.lex | 1 + src/parser/query.yacc | 38 +++++++- src/parser/query_spec.go | 7 ++ src/parser/query_types.h | 2 +- src/parser/test_memory_leaks.sh | 3 + src/protocol/protocol.proto | 1 + 17 files changed, 273 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69131d2846..f2cfa84374 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + ## v0.0.1 [2013-10-22] * Initial Release @@ -296,3 +297,4 @@ - [Issue #333](https://github.com/influxdb/influxdb/issues/333). Better error message when password is invalid and don't create the user if the password is invalid +- [Issue #318](https://github.com/influxdb/influxdb/issues/318). Support EXPLAIN queries diff --git a/src/cluster/shard.go b/src/cluster/shard.go index e8355ffad5..71064b5694 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -33,6 +33,12 @@ type QueryProcessor interface { YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool Close() + + // Set by the shard, so EXPLAIN query can know query against which shard is being measured + SetShardInfo(shardId int, shardLocal bool) + + // Let QueryProcessor identify itself. What if it is a spy and we can't check that? + GetName() string } type NewShardData struct { @@ -217,15 +223,10 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo log.Error("Error while creating engine: %s", err) return } - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } + processor.SetShardInfo(int(self.Id()), self.IsLocal) } else { maxPointsToBufferBeforeSending := 1000 processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } } } shard, err := self.store.GetOrCreateShard(self.id) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 9cf224fb26..889b8074c8 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -42,12 +42,13 @@ var ( // shorter constants for readability var ( - dropDatabase = protocol.Request_DROP_DATABASE - queryRequest = protocol.Request_QUERY - endStreamResponse = protocol.Response_END_STREAM - queryResponse = protocol.Response_QUERY - heartbeatResponse = protocol.Response_HEARTBEAT - write = protocol.Request_WRITE + dropDatabase = protocol.Request_DROP_DATABASE + queryRequest = protocol.Request_QUERY + endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY + heartbeatResponse = protocol.Response_HEARTBEAT + explainQueryResponse = protocol.Response_EXPLAIN_QUERY + write = protocol.Request_WRITE ) type SeriesWriter interface { @@ -254,14 +255,17 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, go func() { for { - res := <-responseChan - if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse { + response := <-responseChan + + if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { writer.Close() seriesClosed <- true return } - if res.Series != nil && len(res.Series.Points) > 0 { - writer.Write(res.Series) + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + if response.Series != nil && len(response.Series.Points) > 0 { + writer.Write(response.Series) + } } } }() @@ -278,6 +282,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri responses := make([]chan *protocol.Response, 0) for _, shard := range shards { responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) + // We query shards for data and stream them to query processor go shard.Query(querySpec, responseChan) responses = append(responses, responseChan) } @@ -297,10 +302,13 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri } // if we don't have a processor, yield the point to the writer + // this happens if shard took care of the query + // otherwise client will get points from passthrough engine if processor == nil { - log.Debug("WRITING: ", len(response.Series.Points)) - seriesWriter.Write(response.Series) - log.Debug("WRITING (done)") + // If we have EXPLAIN query, we don't write actual points (of response.Type Query) to the client + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + seriesWriter.Write(response.Series) + } continue } @@ -308,8 +316,15 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // the data here if response.Series != nil { log.Debug("YIELDING: ", len(response.Series.Points)) + + // we need to forward message type to PasstrhoughEngine + // this is a bit dirty TODO: refactor it... + if querySpec.IsExplainQuery() && processor.GetName() == "PassthroughEngine" { + + } + processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) - } + } } log.Debug("DONE: shard: ", shards[i].String()) } diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 02d4325358..be8f1542b5 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -58,7 +58,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn // the query should always parse correctly since it was parsed at the originating server. queries, err := parser.ParseQuery(*request.Query) if err != nil || len(queries) < 1 { - log.Error("Erorr parsing query: ", err) + log.Error("Error parsing query: ", err) errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName) response := &protocol.Response{Type: &endStreamResponse, ErrorMessage: &errorMsg, RequestId: request.Id} self.WriteResponse(conn, response) @@ -80,6 +80,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn } shard := self.clusterConfig.GetLocalShardById(*request.ShardId) + querySpec := parser.NewQuerySpec(user, *request.Database, query) responseChan := make(chan *protocol.Response) diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 45a00bdbf9..654f15429f 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -1,6 +1,6 @@ package engine -import ( +import ( "common" "fmt" "math" @@ -554,7 +554,7 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} if timestamps == nil { timestamps = make(map[interface{}]int64) self.timestamps[series] = timestamps - } + } if self.duration != nil { timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration } else { @@ -571,18 +571,19 @@ func (self *TimestampAggregator) AggregateSeries(series string, group interface{ } if len(s.Points) > 0 { if self.duration != nil { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() / *self.duration * *self.duration + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() / *self.duration * *self.duration } else { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() } } return nil } + /* //TODO: to be optimized func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { //log.Error("Timestamp: ", len(s.Points)) - for _, p := range s.Points { + for _, p := range s.Points { //log.Error("Point: ", p) self.AggregatePoint(series, group, p) } @@ -614,12 +615,9 @@ func NewTimestampAggregator(query *parser.SelectQuery, _ *parser.Value) (Aggrega var durationPtr *int64 - //log.Error("Duration: ", duration) - if duration != nil { newDuration := int64(*duration / time.Microsecond) durationPtr = &newDuration - // log.Error("Woohoo! ", durationPtr) } return &TimestampAggregator{ diff --git a/src/engine/engine.go b/src/engine/engine.go index 60d4c6f68b..0689cff8ce 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -29,17 +29,26 @@ type QueryEngine struct { pointsRange map[string]*PointRange groupBy *parser.GroupByClause aggregateYield func(*protocol.Series) error + explain bool + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } +var ( + endStreamResponse = protocol.Response_END_STREAM + explainQueryResponse = protocol.Response_EXPLAIN_QUERY +) + const ( POINT_BATCH_SIZE = 64 ) -var ( - responseQuery = protocol.Response_QUERY - responseEndStream = protocol.Response_END_STREAM -) - // distribute query and possibly do the merge/join before yielding the points func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error { // see if this is a merge query @@ -69,11 +78,30 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo where: query.GetWhereCondition(), limiter: NewLimiter(limit), responseChan: responseChan, - seriesToPoints: make(map[string]*protocol.Series), + seriesToPoints: make(map[string]*protocol.Series), + // stats stuff + explain: query.IsExplainQuery(), + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query + } + + if queryEngine.explain { + queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) } yield := func(series *protocol.Series) error { - response := &protocol.Response{Type: &responseQuery, Series: series} + var response *protocol.Response + + if queryEngine.explain { + //TODO: We may not have to send points, just count them + queryEngine.pointsWritten += int64(len(series.Points)) + } + + response = &protocol.Response{Type: &queryResponse, Series: series} responseChan <- response return nil } @@ -93,6 +121,12 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo return queryEngine, nil } +// Shard will call this method for EXPLAIN query +func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) { + self.shardId = shardId + self.shardLocal = shardLocal +} + // Returns false if the query should be stopped (either because of limit or error) func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) { shouldContinue = true @@ -107,10 +141,17 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi } series.Points = append(series.Points, point) + if self.explain { + self.pointsRead++ + } + return shouldContinue } func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) { + if self.explain { + self.pointsRead += int64(len(seriesIncoming.Points)) + } return self.yieldSeriesData(seriesIncoming) } @@ -191,7 +232,14 @@ func (self *QueryEngine) Close() { if self.isAggregateQuery { self.runAggregates() } - response := &protocol.Response{Type: &responseEndStream} + + if self.explain { + self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) + log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten) + + self.SendQueryStats() + } + response := &protocol.Response{Type: &endStreamResponse} if err != nil { message := err.Error() response.ErrorMessage = &message @@ -199,6 +247,40 @@ func (self *QueryEngine) Close() { self.responseChan <- response } +func (self *QueryEngine) SendQueryStats() { + timestamp := time.Now().UnixNano() / int64(time.Microsecond) + + runTime := self.runEndTime - self.runStartTime + points := []*protocol.Point{} + pointsRead := self.pointsRead + pointsWritten := self.pointsWritten + shardId := int64(self.shardId) + shardLocal := self.shardLocal + engineName := "QueryEngine" + + point := &protocol.Point{ + Values: []*protocol.FieldValue{ + &protocol.FieldValue{StringValue: &engineName}, + &protocol.FieldValue{Int64Value: &shardId}, + &protocol.FieldValue{BoolValue: &shardLocal}, + &protocol.FieldValue{DoubleValue: &runTime}, + &protocol.FieldValue{Int64Value: &pointsRead}, + &protocol.FieldValue{Int64Value: &pointsWritten}, + }, + Timestamp: ×tamp, + } + points = append(points, point) + + seriesName := "explain query" + series := &protocol.Series{ + Name: &seriesName, + Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"}, + Points: points, + } + response := &protocol.Response{Type: &explainQueryResponse, Series: series} + self.responseChan <- response +} + func containsArithmeticOperators(query *parser.SelectQuery) bool { for _, column := range query.GetColumnNames() { if column.Type == parser.ValueExpression { @@ -612,3 +694,7 @@ func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield return nil }) } + +func (self *QueryEngine) GetName() string { + return "QueryEngine" +} diff --git a/src/engine/limiter.go b/src/engine/limiter.go index f96d48cdf1..11f0fe83d7 100644 --- a/src/engine/limiter.go +++ b/src/engine/limiter.go @@ -1,7 +1,7 @@ package engine import ( - "protocol" + "protocol" ) type Limiter struct { @@ -18,10 +18,10 @@ func NewLimiter(limit int) *Limiter { } } -func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { +func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { if self.shouldLimit { - // if the limit is 0, stop returning any points - limit := self.limitForSeries(*series.Name) + // if the limit is 0, stop returning any points + limit := self.limitForSeries(*series.Name) defer func() { self.limits[*series.Name] = limit }() if limit == 0 { series.Points = nil @@ -33,7 +33,7 @@ func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { series.Points = series.Points[0:sliceTo] limit = 0 } - } + } } func (self *Limiter) hitLimit(seriesName string) bool { diff --git a/src/engine/list_series_engine.go b/src/engine/list_series_engine.go index 99b36afb6b..38595879fb 100644 --- a/src/engine/list_series_engine.go +++ b/src/engine/list_series_engine.go @@ -9,8 +9,7 @@ const ( ) var ( - queryResponse = protocol.Response_QUERY - endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY ) type ListSeriesEngine struct { @@ -61,3 +60,11 @@ func (self *ListSeriesEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't work with this query +} + +func (self *ListSeriesEngine) GetName() string { + return "ListSeriesEngine" +} diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index d45b9ef57c..61371d60ec 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -12,6 +12,15 @@ type PassthroughEngine struct { response *protocol.Response maxPointsInResponse int limiter *Limiter + responseType *protocol.Response_Type + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine { @@ -19,14 +28,24 @@ func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInRespo } func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine { - return &PassthroughEngine{ + passthroughEngine := &PassthroughEngine{ responseChan: responseChan, maxPointsInResponse: maxPointsInResponse, limiter: NewLimiter(limit), + responseType: &queryResponse, + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query } + + return passthroughEngine } func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { + self.responseType = &queryResponse series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames} self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) == 0 { @@ -35,19 +54,19 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } - } else if self.response.Series.Name != seriesName { + } else if *self.response.Series.Name != *seriesName { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else { @@ -58,42 +77,34 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool { log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) -/* - seriesCopy := &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} - for _, point := range seriesIncoming.Points { - seriesCopy.Points = append(seriesCopy.Points, point) + if *seriesIncoming.Name == "explain query" { + self.responseType = &explainQueryResponse + log.Debug("Response Changed!") + } else { + self.responseType = &queryResponse } -*/ - //log.Debug("PT Copied %d %d", len(seriesIncoming.Points), POINT_BATCH_SIZE) + self.limiter.calculateLimitAndSlicePoints(seriesIncoming) if len(seriesIncoming.Points) == 0 { log.Error("Not sent == 0") return false - } + } - //log.Debug("PassthroughEngine", seriesCopy) - /* - self.response = &protocol.Response{ - Type: &queryResponse, - Series: seriesIncoming, - } - self.responseChan <- self.response - */ if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else if *self.response.Series.Name != *seriesName { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else { @@ -103,7 +114,6 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri //return true } - func (self *PassthroughEngine) Close() { if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { self.responseChan <- self.response @@ -111,3 +121,11 @@ func (self *PassthroughEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't really work with this query (yet ?) +} + +func (self *PassthroughEngine) GetName() string { + return "PassthroughEngine" +} diff --git a/src/parser/parser.go b/src/parser/parser.go index 065fa835f5..4b588673f6 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -150,6 +150,7 @@ type SelectQuery struct { IntoClause *IntoClause Limit int Ascending bool + Explain bool } type ListType int @@ -203,6 +204,10 @@ func (self *Query) IsListQuery() bool { return self.ListQuery != nil } +func (self *Query) IsExplainQuery() bool { + return self.SelectQuery != nil && self.SelectQuery.Explain +} + func (self *Query) IsListSeriesQuery() bool { return self.ListQuery != nil && self.ListQuery.Type == Series } @@ -219,6 +224,10 @@ func (self *SelectQuery) GetColumnNames() []*Value { return self.ColumnNames } +func (self *SelectQuery) IsExplainQuery() bool { + return self.Explain +} + func (self *SelectQuery) IsSinglePointQuery() bool { w := self.GetWhereCondition() if w == nil { @@ -617,6 +626,7 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro SelectDeleteCommonQuery: basicQuery, Limit: int(limit), Ascending: q.ascending != 0, + Explain: q.explain != 0, } // get the column names diff --git a/src/parser/parser_test.go b/src/parser/parser_test.go index e9112d7dfb..054b43a69e 100644 --- a/src/parser/parser_test.go +++ b/src/parser/parser_test.go @@ -31,6 +31,23 @@ func (self *QueryParserSuite) TestInvalidFromClause(c *C) { c.Assert(err, ErrorMatches, ".*\\$undefined.*") } +func (self *QueryParserSuite) TestInvalidExplainQueries(c *C) { + query := "explain select foo, baz group by time(1d)" + _, err := ParseQuery(query) + + c.Assert(err, NotNil) +} + +func (self *QueryParserSuite) TestExplainQueries(c *C) { + query := "explain select foo, bar from baz group by time(1d)" + queries, err := ParseQuery(query) + + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].SelectQuery, NotNil) + c.Assert(queries[0].SelectQuery.IsExplainQuery(), Equals, true) +} + func (self *QueryParserSuite) TestParseBasicSelectQuery(c *C) { for _, query := range []string{ "select value from t where c = '5';", diff --git a/src/parser/query.lex b/src/parser/query.lex index fbf5ba95ad..56790fce22 100644 --- a/src/parser/query.lex +++ b/src/parser/query.lex @@ -58,6 +58,7 @@ static int yycolumn = 1; "where" { BEGIN(INITIAL); return WHERE; } "as" { return AS; } "select" { return SELECT; } +"explain" { return EXPLAIN; } "delete" { return DELETE; } "drop series" { return DROP_SERIES; } "drop" { return DROP; } diff --git a/src/parser/query.yacc b/src/parser/query.yacc index f894fbc83c..236f37f41e 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -74,7 +74,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %lex-param {void *scanner} // define types of tokens (terminals) -%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES +%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES EXPLAIN %token STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME INTO_NAME REGEX_OP %token NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION @@ -105,6 +105,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %type DROP_SERIES_QUERY %type SELECT_QUERY %type DROP_QUERY +%type EXPLAIN_QUERY // the initial token %start ALL_QUERIES @@ -176,6 +177,12 @@ QUERY: $$ = calloc(1, sizeof(query)); $$->list_continuous_queries_query = TRUE; } + | + EXPLAIN_QUERY + { + $$ = calloc(1, sizeof(query)); + $$->select_query = $1; + } DROP_QUERY: DROP CONTINUOUS_QUERY INT_VALUE @@ -200,6 +207,33 @@ DROP_SERIES_QUERY: $$->name = $2; } +EXPLAIN_QUERY: + EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE + { + $$ = calloc(1, sizeof(select_query)); + $$->c = $3; + $$->from_clause = $4; + $$->group_by = $5; + $$->where_condition = $6; + $$->limit = $7.limit; + $$->ascending = $7.ascending; + $$->into_clause = $8; + $$->explain = TRUE; + } + | + EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE + { + $$ = calloc(1, sizeof(select_query)); + $$->c = $3; + $$->from_clause = $4; + $$->where_condition = $5; + $$->group_by = $6; + $$->limit = $7.limit; + $$->ascending = $7.ascending; + $$->into_clause = $8; + $$->explain = TRUE; + } + SELECT_QUERY: SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE { @@ -211,6 +245,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } | SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE @@ -223,6 +258,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } LIMIT_AND_ORDER_CLAUSES: diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 113957e831..0c53c1a812 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -109,6 +109,13 @@ func (self *QuerySpec) IsSinglePointQuery() bool { return false } +func (self *QuerySpec) IsExplainQuery() bool { + if self.query.SelectQuery != nil { + return self.query.SelectQuery.IsExplainQuery() + } + return false +} + func (self *QuerySpec) SelectQuery() *SelectQuery { return self.query.SelectQuery } diff --git a/src/parser/query_types.h b/src/parser/query_types.h index c9d5c4775a..63c5665440 100644 --- a/src/parser/query_types.h +++ b/src/parser/query_types.h @@ -88,6 +88,7 @@ typedef struct { condition *where_condition; int limit; char ascending; + char explain; } select_query; typedef struct { @@ -124,4 +125,3 @@ void free_error (error *error); // this is the api that is used in GO query parse_query(char *const query_s); void close_query (query *q); - diff --git a/src/parser/test_memory_leaks.sh b/src/parser/test_memory_leaks.sh index b25c7aff64..9f1833b42c 100755 --- a/src/parser/test_memory_leaks.sh +++ b/src/parser/test_memory_leaks.sh @@ -8,6 +8,9 @@ int main(int argc, char **argv) { query q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time>now()-1d;"); close_query(&q); + q = parse_query("explain select users.events group_by user_email,time(1h) where time>now()-1d;"); + close_query(&q); + // test freeing on error q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time >> now()-1d;"); close_query(&q); diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index 2a0ce6c2c4..d4ae863e0d 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -57,6 +57,7 @@ message Response { // Access denied also serves as an end of stream response ACCESS_DENIED = 8; HEARTBEAT = 9; + EXPLAIN_QUERY = 10; } enum ErrorCode { REQUEST_TOO_LARGE = 1; From f2e9c8a5431d4ac46a16b2ef7e1074a2de9446c3 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Mon, 10 Mar 2014 16:24:48 -0400 Subject: [PATCH 3/6] fix some log statements --- src/coordinator/coordinator.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 889b8074c8..51741c747a 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -78,7 +78,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC } func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) { - log.Debug("COORD: RunQuery: ", queryString) + log.Debug("COORD: RunQuery: %s", queryString) // don't let a panic pass beyond RunQuery defer recoverFunc(database, queryString) @@ -315,14 +315,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // if the data wasn't aggregated at the shard level, aggregate // the data here if response.Series != nil { - log.Debug("YIELDING: ", len(response.Series.Points)) - - // we need to forward message type to PasstrhoughEngine - // this is a bit dirty TODO: refactor it... - if querySpec.IsExplainQuery() && processor.GetName() == "PassthroughEngine" { - - } - + log.Debug("YIELDING: %d points", len(response.Series.Points)) processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) } } From f5706829e20217d5e348b689c7c34620cc278dc3 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 12 Mar 2014 17:55:43 -0400 Subject: [PATCH 4/6] DRY the parser a little bit, we already have a definition for SELECT --- src/parser/query.yacc | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/src/parser/query.yacc b/src/parser/query.yacc index 236f37f41e..cef7733e8f 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -208,29 +208,9 @@ DROP_SERIES_QUERY: } EXPLAIN_QUERY: - EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE + EXPLAIN SELECT_QUERY { - $$ = calloc(1, sizeof(select_query)); - $$->c = $3; - $$->from_clause = $4; - $$->group_by = $5; - $$->where_condition = $6; - $$->limit = $7.limit; - $$->ascending = $7.ascending; - $$->into_clause = $8; - $$->explain = TRUE; - } - | - EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE - { - $$ = calloc(1, sizeof(select_query)); - $$->c = $3; - $$->from_clause = $4; - $$->where_condition = $5; - $$->group_by = $6; - $$->limit = $7.limit; - $$->ascending = $7.ascending; - $$->into_clause = $8; + $$ = $2; $$->explain = TRUE; } From df84b7fe79758859e4b620cc049a5225cfbcafb2 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 12 Mar 2014 19:37:16 -0400 Subject: [PATCH 5/6] fix few bugs introduced in dbe76df The changes in that commit broke Join and Merge, because they were bypassing the QueryEngine which kept track of the series that should be closed at the end of the query. Aggregators require an empty time series to be sent after the query ends to flush their internal state. The other bug was in deletes. Fixing the first bug broke the new behavior of deletes which is return nothing if the time series is empty. This is also fixed. --- src/cluster/shard.go | 4 +++- src/coordinator/coordinator.go | 28 ++++++++++++++++------------ src/datastore/leveldb_shard.go | 14 +++++++++----- src/engine/engine.go | 8 +++++++- src/engine/list_series_engine.go | 4 ++-- src/engine/merge.go | 4 ++++ src/engine/passthrough_engine.go | 6 +++--- 7 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 71064b5694..519576daff 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -31,7 +31,7 @@ type QueryProcessor interface { // This method returns true if the query should continue. If the query should be stopped, // like maybe the limit was hit, it should return false YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool - YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool + YieldSeries(seriesIncoming *p.Series) bool Close() // Set by the shard, so EXPLAIN query can know query against which shard is being measured @@ -217,6 +217,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo processor = engine.NewPassthroughEngine(response, maxDeleteResults) } else { if self.ShouldAggregateLocally(querySpec) { + fmt.Printf("creating a query engine\n") processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response) if err != nil { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} @@ -226,6 +227,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo processor.SetShardInfo(int(self.Id()), self.IsLocal) } else { maxPointsToBufferBeforeSending := 1000 + fmt.Printf("creating a passthrough engine\n") processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) } } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 51741c747a..37f215087a 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -301,22 +301,26 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri break } - // if we don't have a processor, yield the point to the writer - // this happens if shard took care of the query - // otherwise client will get points from passthrough engine - if processor == nil { - // If we have EXPLAIN query, we don't write actual points (of response.Type Query) to the client - if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { - seriesWriter.Write(response.Series) - } + if response.Series == nil || len(response.Series.Points) == 0 { + log.Debug("Series has no points, continue") continue } - // if the data wasn't aggregated at the shard level, aggregate - // the data here - if response.Series != nil { + // if we don't have a processor, yield the point to the writer + // this happens if shard took care of the query + // otherwise client will get points from passthrough engine + if processor != nil { + // if the data wasn't aggregated at the shard level, aggregate + // the data here log.Debug("YIELDING: %d points", len(response.Series.Points)) - processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) + processor.YieldSeries(response.Series) + continue + } + + // If we have EXPLAIN query, we don't write actual points (of + // response.Type Query) to the client + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + seriesWriter.Write(response.Series) } } log.Debug("DONE: shard: ", shards[i].String()) diff --git a/src/datastore/leveldb_shard.go b/src/datastore/leveldb_shard.go index ee1135c3ec..03a1ab320f 100644 --- a/src/datastore/leveldb_shard.go +++ b/src/datastore/leveldb_shard.go @@ -281,8 +281,12 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser if len(seriesOutgoing.Points) >= self.pointBatchSize { for _, alias := range aliases { - _alias := alias - if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + series := &protocol.Series{ + Name: proto.String(alias), + Fields: fieldNames, + Points: seriesOutgoing.Points, + } + if !processor.YieldSeries(series) { shouldContinue = false } } @@ -296,9 +300,9 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser //Yield remaining data for _, alias := range aliases { - _alias := alias - log.Debug("Final Flush %s", _alias) - if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + log.Debug("Final Flush %s", alias) + series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points} + if !processor.YieldSeries(series) { log.Debug("Cancelled...") } } diff --git a/src/engine/engine.go b/src/engine/engine.go index 0689cff8ce..803146faca 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -145,13 +145,16 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi self.pointsRead++ } + fmt.Printf("self.seriesToPoints: %#v\n", self.seriesToPoints) return shouldContinue } -func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) { +func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) { if self.explain { self.pointsRead += int64(len(seriesIncoming.Points)) } + seriesName := seriesIncoming.GetName() + self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields} return self.yieldSeriesData(seriesIncoming) } @@ -210,6 +213,8 @@ func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, er } func (self *QueryEngine) Close() { + fmt.Printf("Closing: %#v\n", self.seriesToPoints) + for _, series := range self.seriesToPoints { if len(series.Points) == 0 { continue @@ -223,6 +228,7 @@ func (self *QueryEngine) Close() { Name: series.Name, Fields: series.Fields, } + fmt.Printf("yielding empty series for %s\n", series.GetName()) err = self.yield(s) if err != nil { break diff --git a/src/engine/list_series_engine.go b/src/engine/list_series_engine.go index 38595879fb..753fbd7faf 100644 --- a/src/engine/list_series_engine.go +++ b/src/engine/list_series_engine.go @@ -41,7 +41,7 @@ func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []strin return true } -func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool { +func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool { if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE { self.responseChan <- self.response self.response = &protocol.Response{ @@ -49,7 +49,7 @@ func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []stri MultiSeries: make([]*protocol.Series, 0), } } - self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName}) + self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name}) return true } diff --git a/src/engine/merge.go b/src/engine/merge.go index 779f092139..6e8b17720e 100644 --- a/src/engine/merge.go +++ b/src/engine/merge.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "parser" "protocol" ) @@ -16,6 +17,7 @@ func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error) name := table1 + "_join_" + table2 return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error { + fmt.Printf("join series: %d\n", len(s.Points)) if *s.Name == table1 { lastPoint1 = s.Points[len(s.Points)-1] if lastFields1 == nil { @@ -269,6 +271,8 @@ func mergeYield(table1, table2 string, modifyValues bool, ascending bool, yield } return func(p *protocol.Series) error { + fmt.Printf("series: %v\n", len(p.Points)) + state.updateState(p) if err := state.flushIfNecessary(yield); err != nil { diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index 61371d60ec..2576323080 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -75,7 +75,7 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri return !self.limiter.hitLimit(*seriesName) } -func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool { +func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool { log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) if *seriesIncoming.Name == "explain query" { self.responseType = &explainQueryResponse @@ -95,7 +95,7 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri Type: self.responseType, Series: seriesIncoming, } - } else if *self.response.Series.Name != *seriesName { + } else if self.response.Series.GetName() != seriesIncoming.GetName() { self.responseChan <- self.response self.response = &protocol.Response{ Type: self.responseType, @@ -110,7 +110,7 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri } else { self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...) } - return !self.limiter.hitLimit(*seriesName) + return !self.limiter.hitLimit(seriesIncoming.GetName()) //return true } From 7bda031982ac07c7ec64d97baf4e2a85f15ba3ac Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 13 Mar 2014 15:40:19 -0400 Subject: [PATCH 6/6] change the default log level to debug --- config.toml.sample | 2 +- src/integration/test_config1.toml | 2 +- src/integration/test_config2.toml | 2 +- src/integration/test_config3.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config.toml.sample b/config.toml.sample index 19a62fc2de..f2bc75a3d5 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -9,7 +9,7 @@ bind-address = "0.0.0.0" [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "influxdb.log" # stdout to log to standard out # Configure the admin server diff --git a/src/integration/test_config1.toml b/src/integration/test_config1.toml index 9ffd56e598..39fc7ae86f 100644 --- a/src/integration/test_config1.toml +++ b/src/integration/test_config1.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/1/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config2.toml b/src/integration/test_config2.toml index 7b4be21198..59d0052fa7 100644 --- a/src/integration/test_config2.toml +++ b/src/integration/test_config2.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/2/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config3.toml b/src/integration/test_config3.toml index b295ffce51..4f7fd822f2 100644 --- a/src/integration/test_config3.toml +++ b/src/integration/test_config3.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/3/influxdb.log" # Configure the admin server