From e2adcf1c581dc5b6074901d11656887ffca2cdb1 Mon Sep 17 00:00:00 2001 From: Pawel Szymanski Date: Sat, 8 Mar 2014 19:28:54 -0800 Subject: [PATCH] Close #318. Support EXPLAIN queries --- CHANGELOG.md | 2 + src/cluster/shard.go | 13 +-- src/coordinator/coordinator.go | 43 ++++++--- src/coordinator/protobuf_request_handler.go | 3 +- src/engine/aggregator.go | 14 ++- src/engine/engine.go | 102 ++++++++++++++++++-- src/engine/limiter.go | 10 +- src/engine/list_series_engine.go | 11 ++- src/engine/passthrough_engine.go | 66 ++++++++----- src/parser/parser.go | 10 ++ src/parser/parser_test.go | 17 ++++ src/parser/query.lex | 1 + src/parser/query.yacc | 38 +++++++- src/parser/query_spec.go | 7 ++ src/parser/query_types.h | 2 +- src/parser/test_memory_leaks.sh | 3 + src/protocol/protocol.proto | 1 + 17 files changed, 273 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69131d2846..f2cfa84374 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + ## v0.0.1 [2013-10-22] * Initial Release @@ -296,3 +297,4 @@ - [Issue #333](https://github.com/influxdb/influxdb/issues/333). Better error message when password is invalid and don't create the user if the password is invalid +- [Issue #318](https://github.com/influxdb/influxdb/issues/318). Support EXPLAIN queries diff --git a/src/cluster/shard.go b/src/cluster/shard.go index e8355ffad5..71064b5694 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -33,6 +33,12 @@ type QueryProcessor interface { YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool Close() + + // Set by the shard, so EXPLAIN query can know query against which shard is being measured + SetShardInfo(shardId int, shardLocal bool) + + // Let QueryProcessor identify itself. What if it is a spy and we can't check that? + GetName() string } type NewShardData struct { @@ -217,15 +223,10 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo log.Error("Error while creating engine: %s", err) return } - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } + processor.SetShardInfo(int(self.Id()), self.IsLocal) } else { maxPointsToBufferBeforeSending := 1000 processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } } } shard, err := self.store.GetOrCreateShard(self.id) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 9cf224fb26..889b8074c8 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -42,12 +42,13 @@ var ( // shorter constants for readability var ( - dropDatabase = protocol.Request_DROP_DATABASE - queryRequest = protocol.Request_QUERY - endStreamResponse = protocol.Response_END_STREAM - queryResponse = protocol.Response_QUERY - heartbeatResponse = protocol.Response_HEARTBEAT - write = protocol.Request_WRITE + dropDatabase = protocol.Request_DROP_DATABASE + queryRequest = protocol.Request_QUERY + endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY + heartbeatResponse = protocol.Response_HEARTBEAT + explainQueryResponse = protocol.Response_EXPLAIN_QUERY + write = protocol.Request_WRITE ) type SeriesWriter interface { @@ -254,14 +255,17 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, go func() { for { - res := <-responseChan - if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse { + response := <-responseChan + + if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { writer.Close() seriesClosed <- true return } - if res.Series != nil && len(res.Series.Points) > 0 { - writer.Write(res.Series) + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + if response.Series != nil && len(response.Series.Points) > 0 { + writer.Write(response.Series) + } } } }() @@ -278,6 +282,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri responses := make([]chan *protocol.Response, 0) for _, shard := range shards { responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) + // We query shards for data and stream them to query processor go shard.Query(querySpec, responseChan) responses = append(responses, responseChan) } @@ -297,10 +302,13 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri } // if we don't have a processor, yield the point to the writer + // this happens if shard took care of the query + // otherwise client will get points from passthrough engine if processor == nil { - log.Debug("WRITING: ", len(response.Series.Points)) - seriesWriter.Write(response.Series) - log.Debug("WRITING (done)") + // If we have EXPLAIN query, we don't write actual points (of response.Type Query) to the client + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + seriesWriter.Write(response.Series) + } continue } @@ -308,8 +316,15 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // the data here if response.Series != nil { log.Debug("YIELDING: ", len(response.Series.Points)) + + // we need to forward message type to PasstrhoughEngine + // this is a bit dirty TODO: refactor it... + if querySpec.IsExplainQuery() && processor.GetName() == "PassthroughEngine" { + + } + processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) - } + } } log.Debug("DONE: shard: ", shards[i].String()) } diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 02d4325358..be8f1542b5 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -58,7 +58,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn // the query should always parse correctly since it was parsed at the originating server. queries, err := parser.ParseQuery(*request.Query) if err != nil || len(queries) < 1 { - log.Error("Erorr parsing query: ", err) + log.Error("Error parsing query: ", err) errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName) response := &protocol.Response{Type: &endStreamResponse, ErrorMessage: &errorMsg, RequestId: request.Id} self.WriteResponse(conn, response) @@ -80,6 +80,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn } shard := self.clusterConfig.GetLocalShardById(*request.ShardId) + querySpec := parser.NewQuerySpec(user, *request.Database, query) responseChan := make(chan *protocol.Response) diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 45a00bdbf9..654f15429f 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -1,6 +1,6 @@ package engine -import ( +import ( "common" "fmt" "math" @@ -554,7 +554,7 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} if timestamps == nil { timestamps = make(map[interface{}]int64) self.timestamps[series] = timestamps - } + } if self.duration != nil { timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration } else { @@ -571,18 +571,19 @@ func (self *TimestampAggregator) AggregateSeries(series string, group interface{ } if len(s.Points) > 0 { if self.duration != nil { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() / *self.duration * *self.duration + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() / *self.duration * *self.duration } else { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() } } return nil } + /* //TODO: to be optimized func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { //log.Error("Timestamp: ", len(s.Points)) - for _, p := range s.Points { + for _, p := range s.Points { //log.Error("Point: ", p) self.AggregatePoint(series, group, p) } @@ -614,12 +615,9 @@ func NewTimestampAggregator(query *parser.SelectQuery, _ *parser.Value) (Aggrega var durationPtr *int64 - //log.Error("Duration: ", duration) - if duration != nil { newDuration := int64(*duration / time.Microsecond) durationPtr = &newDuration - // log.Error("Woohoo! ", durationPtr) } return &TimestampAggregator{ diff --git a/src/engine/engine.go b/src/engine/engine.go index 60d4c6f68b..0689cff8ce 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -29,17 +29,26 @@ type QueryEngine struct { pointsRange map[string]*PointRange groupBy *parser.GroupByClause aggregateYield func(*protocol.Series) error + explain bool + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } +var ( + endStreamResponse = protocol.Response_END_STREAM + explainQueryResponse = protocol.Response_EXPLAIN_QUERY +) + const ( POINT_BATCH_SIZE = 64 ) -var ( - responseQuery = protocol.Response_QUERY - responseEndStream = protocol.Response_END_STREAM -) - // distribute query and possibly do the merge/join before yielding the points func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error { // see if this is a merge query @@ -69,11 +78,30 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo where: query.GetWhereCondition(), limiter: NewLimiter(limit), responseChan: responseChan, - seriesToPoints: make(map[string]*protocol.Series), + seriesToPoints: make(map[string]*protocol.Series), + // stats stuff + explain: query.IsExplainQuery(), + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query + } + + if queryEngine.explain { + queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) } yield := func(series *protocol.Series) error { - response := &protocol.Response{Type: &responseQuery, Series: series} + var response *protocol.Response + + if queryEngine.explain { + //TODO: We may not have to send points, just count them + queryEngine.pointsWritten += int64(len(series.Points)) + } + + response = &protocol.Response{Type: &queryResponse, Series: series} responseChan <- response return nil } @@ -93,6 +121,12 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo return queryEngine, nil } +// Shard will call this method for EXPLAIN query +func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) { + self.shardId = shardId + self.shardLocal = shardLocal +} + // Returns false if the query should be stopped (either because of limit or error) func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) { shouldContinue = true @@ -107,10 +141,17 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi } series.Points = append(series.Points, point) + if self.explain { + self.pointsRead++ + } + return shouldContinue } func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) { + if self.explain { + self.pointsRead += int64(len(seriesIncoming.Points)) + } return self.yieldSeriesData(seriesIncoming) } @@ -191,7 +232,14 @@ func (self *QueryEngine) Close() { if self.isAggregateQuery { self.runAggregates() } - response := &protocol.Response{Type: &responseEndStream} + + if self.explain { + self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) + log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten) + + self.SendQueryStats() + } + response := &protocol.Response{Type: &endStreamResponse} if err != nil { message := err.Error() response.ErrorMessage = &message @@ -199,6 +247,40 @@ func (self *QueryEngine) Close() { self.responseChan <- response } +func (self *QueryEngine) SendQueryStats() { + timestamp := time.Now().UnixNano() / int64(time.Microsecond) + + runTime := self.runEndTime - self.runStartTime + points := []*protocol.Point{} + pointsRead := self.pointsRead + pointsWritten := self.pointsWritten + shardId := int64(self.shardId) + shardLocal := self.shardLocal + engineName := "QueryEngine" + + point := &protocol.Point{ + Values: []*protocol.FieldValue{ + &protocol.FieldValue{StringValue: &engineName}, + &protocol.FieldValue{Int64Value: &shardId}, + &protocol.FieldValue{BoolValue: &shardLocal}, + &protocol.FieldValue{DoubleValue: &runTime}, + &protocol.FieldValue{Int64Value: &pointsRead}, + &protocol.FieldValue{Int64Value: &pointsWritten}, + }, + Timestamp: ×tamp, + } + points = append(points, point) + + seriesName := "explain query" + series := &protocol.Series{ + Name: &seriesName, + Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"}, + Points: points, + } + response := &protocol.Response{Type: &explainQueryResponse, Series: series} + self.responseChan <- response +} + func containsArithmeticOperators(query *parser.SelectQuery) bool { for _, column := range query.GetColumnNames() { if column.Type == parser.ValueExpression { @@ -612,3 +694,7 @@ func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield return nil }) } + +func (self *QueryEngine) GetName() string { + return "QueryEngine" +} diff --git a/src/engine/limiter.go b/src/engine/limiter.go index f96d48cdf1..11f0fe83d7 100644 --- a/src/engine/limiter.go +++ b/src/engine/limiter.go @@ -1,7 +1,7 @@ package engine import ( - "protocol" + "protocol" ) type Limiter struct { @@ -18,10 +18,10 @@ func NewLimiter(limit int) *Limiter { } } -func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { +func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { if self.shouldLimit { - // if the limit is 0, stop returning any points - limit := self.limitForSeries(*series.Name) + // if the limit is 0, stop returning any points + limit := self.limitForSeries(*series.Name) defer func() { self.limits[*series.Name] = limit }() if limit == 0 { series.Points = nil @@ -33,7 +33,7 @@ func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { series.Points = series.Points[0:sliceTo] limit = 0 } - } + } } func (self *Limiter) hitLimit(seriesName string) bool { diff --git a/src/engine/list_series_engine.go b/src/engine/list_series_engine.go index 99b36afb6b..38595879fb 100644 --- a/src/engine/list_series_engine.go +++ b/src/engine/list_series_engine.go @@ -9,8 +9,7 @@ const ( ) var ( - queryResponse = protocol.Response_QUERY - endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY ) type ListSeriesEngine struct { @@ -61,3 +60,11 @@ func (self *ListSeriesEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't work with this query +} + +func (self *ListSeriesEngine) GetName() string { + return "ListSeriesEngine" +} diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index d45b9ef57c..61371d60ec 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -12,6 +12,15 @@ type PassthroughEngine struct { response *protocol.Response maxPointsInResponse int limiter *Limiter + responseType *protocol.Response_Type + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine { @@ -19,14 +28,24 @@ func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInRespo } func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine { - return &PassthroughEngine{ + passthroughEngine := &PassthroughEngine{ responseChan: responseChan, maxPointsInResponse: maxPointsInResponse, limiter: NewLimiter(limit), + responseType: &queryResponse, + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query } + + return passthroughEngine } func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { + self.responseType = &queryResponse series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames} self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) == 0 { @@ -35,19 +54,19 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } - } else if self.response.Series.Name != seriesName { + } else if *self.response.Series.Name != *seriesName { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else { @@ -58,42 +77,34 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool { log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) -/* - seriesCopy := &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} - for _, point := range seriesIncoming.Points { - seriesCopy.Points = append(seriesCopy.Points, point) + if *seriesIncoming.Name == "explain query" { + self.responseType = &explainQueryResponse + log.Debug("Response Changed!") + } else { + self.responseType = &queryResponse } -*/ - //log.Debug("PT Copied %d %d", len(seriesIncoming.Points), POINT_BATCH_SIZE) + self.limiter.calculateLimitAndSlicePoints(seriesIncoming) if len(seriesIncoming.Points) == 0 { log.Error("Not sent == 0") return false - } + } - //log.Debug("PassthroughEngine", seriesCopy) - /* - self.response = &protocol.Response{ - Type: &queryResponse, - Series: seriesIncoming, - } - self.responseChan <- self.response - */ if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else if *self.response.Series.Name != *seriesName { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else { @@ -103,7 +114,6 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri //return true } - func (self *PassthroughEngine) Close() { if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { self.responseChan <- self.response @@ -111,3 +121,11 @@ func (self *PassthroughEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't really work with this query (yet ?) +} + +func (self *PassthroughEngine) GetName() string { + return "PassthroughEngine" +} diff --git a/src/parser/parser.go b/src/parser/parser.go index 065fa835f5..4b588673f6 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -150,6 +150,7 @@ type SelectQuery struct { IntoClause *IntoClause Limit int Ascending bool + Explain bool } type ListType int @@ -203,6 +204,10 @@ func (self *Query) IsListQuery() bool { return self.ListQuery != nil } +func (self *Query) IsExplainQuery() bool { + return self.SelectQuery != nil && self.SelectQuery.Explain +} + func (self *Query) IsListSeriesQuery() bool { return self.ListQuery != nil && self.ListQuery.Type == Series } @@ -219,6 +224,10 @@ func (self *SelectQuery) GetColumnNames() []*Value { return self.ColumnNames } +func (self *SelectQuery) IsExplainQuery() bool { + return self.Explain +} + func (self *SelectQuery) IsSinglePointQuery() bool { w := self.GetWhereCondition() if w == nil { @@ -617,6 +626,7 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro SelectDeleteCommonQuery: basicQuery, Limit: int(limit), Ascending: q.ascending != 0, + Explain: q.explain != 0, } // get the column names diff --git a/src/parser/parser_test.go b/src/parser/parser_test.go index e9112d7dfb..054b43a69e 100644 --- a/src/parser/parser_test.go +++ b/src/parser/parser_test.go @@ -31,6 +31,23 @@ func (self *QueryParserSuite) TestInvalidFromClause(c *C) { c.Assert(err, ErrorMatches, ".*\\$undefined.*") } +func (self *QueryParserSuite) TestInvalidExplainQueries(c *C) { + query := "explain select foo, baz group by time(1d)" + _, err := ParseQuery(query) + + c.Assert(err, NotNil) +} + +func (self *QueryParserSuite) TestExplainQueries(c *C) { + query := "explain select foo, bar from baz group by time(1d)" + queries, err := ParseQuery(query) + + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].SelectQuery, NotNil) + c.Assert(queries[0].SelectQuery.IsExplainQuery(), Equals, true) +} + func (self *QueryParserSuite) TestParseBasicSelectQuery(c *C) { for _, query := range []string{ "select value from t where c = '5';", diff --git a/src/parser/query.lex b/src/parser/query.lex index fbf5ba95ad..56790fce22 100644 --- a/src/parser/query.lex +++ b/src/parser/query.lex @@ -58,6 +58,7 @@ static int yycolumn = 1; "where" { BEGIN(INITIAL); return WHERE; } "as" { return AS; } "select" { return SELECT; } +"explain" { return EXPLAIN; } "delete" { return DELETE; } "drop series" { return DROP_SERIES; } "drop" { return DROP; } diff --git a/src/parser/query.yacc b/src/parser/query.yacc index f894fbc83c..236f37f41e 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -74,7 +74,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %lex-param {void *scanner} // define types of tokens (terminals) -%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES +%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES EXPLAIN %token STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME INTO_NAME REGEX_OP %token NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION @@ -105,6 +105,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %type DROP_SERIES_QUERY %type SELECT_QUERY %type DROP_QUERY +%type EXPLAIN_QUERY // the initial token %start ALL_QUERIES @@ -176,6 +177,12 @@ QUERY: $$ = calloc(1, sizeof(query)); $$->list_continuous_queries_query = TRUE; } + | + EXPLAIN_QUERY + { + $$ = calloc(1, sizeof(query)); + $$->select_query = $1; + } DROP_QUERY: DROP CONTINUOUS_QUERY INT_VALUE @@ -200,6 +207,33 @@ DROP_SERIES_QUERY: $$->name = $2; } +EXPLAIN_QUERY: + EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE + { + $$ = calloc(1, sizeof(select_query)); + $$->c = $3; + $$->from_clause = $4; + $$->group_by = $5; + $$->where_condition = $6; + $$->limit = $7.limit; + $$->ascending = $7.ascending; + $$->into_clause = $8; + $$->explain = TRUE; + } + | + EXPLAIN SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE + { + $$ = calloc(1, sizeof(select_query)); + $$->c = $3; + $$->from_clause = $4; + $$->where_condition = $5; + $$->group_by = $6; + $$->limit = $7.limit; + $$->ascending = $7.ascending; + $$->into_clause = $8; + $$->explain = TRUE; + } + SELECT_QUERY: SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE { @@ -211,6 +245,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } | SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE @@ -223,6 +258,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } LIMIT_AND_ORDER_CLAUSES: diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 113957e831..0c53c1a812 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -109,6 +109,13 @@ func (self *QuerySpec) IsSinglePointQuery() bool { return false } +func (self *QuerySpec) IsExplainQuery() bool { + if self.query.SelectQuery != nil { + return self.query.SelectQuery.IsExplainQuery() + } + return false +} + func (self *QuerySpec) SelectQuery() *SelectQuery { return self.query.SelectQuery } diff --git a/src/parser/query_types.h b/src/parser/query_types.h index c9d5c4775a..63c5665440 100644 --- a/src/parser/query_types.h +++ b/src/parser/query_types.h @@ -88,6 +88,7 @@ typedef struct { condition *where_condition; int limit; char ascending; + char explain; } select_query; typedef struct { @@ -124,4 +125,3 @@ void free_error (error *error); // this is the api that is used in GO query parse_query(char *const query_s); void close_query (query *q); - diff --git a/src/parser/test_memory_leaks.sh b/src/parser/test_memory_leaks.sh index b25c7aff64..9f1833b42c 100755 --- a/src/parser/test_memory_leaks.sh +++ b/src/parser/test_memory_leaks.sh @@ -8,6 +8,9 @@ int main(int argc, char **argv) { query q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time>now()-1d;"); close_query(&q); + q = parse_query("explain select users.events group_by user_email,time(1h) where time>now()-1d;"); + close_query(&q); + // test freeing on error q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time >> now()-1d;"); close_query(&q); diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index 2a0ce6c2c4..d4ae863e0d 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -57,6 +57,7 @@ message Response { // Access denied also serves as an end of stream response ACCESS_DENIED = 8; HEARTBEAT = 9; + EXPLAIN_QUERY = 10; } enum ErrorCode { REQUEST_TOO_LARGE = 1;