diff --git a/CHANGELOG.md b/CHANGELOG.md index 69131d2846..f2cfa84374 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + ## v0.0.1 [2013-10-22] * Initial Release @@ -296,3 +297,4 @@ - [Issue #333](https://github.com/influxdb/influxdb/issues/333). Better error message when password is invalid and don't create the user if the password is invalid +- [Issue #318](https://github.com/influxdb/influxdb/issues/318). Support EXPLAIN queries diff --git a/config.toml.sample b/config.toml.sample index 19a62fc2de..f2bc75a3d5 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -9,7 +9,7 @@ bind-address = "0.0.0.0" [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "influxdb.log" # stdout to log to standard out # Configure the admin server diff --git a/src/cluster/shard.go b/src/cluster/shard.go index e8355ffad5..519576daff 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -31,8 +31,14 @@ type QueryProcessor interface { // This method returns true if the query should continue. If the query should be stopped, // like maybe the limit was hit, it should return false YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool - YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool + YieldSeries(seriesIncoming *p.Series) bool Close() + + // Set by the shard, so EXPLAIN query can know query against which shard is being measured + SetShardInfo(shardId int, shardLocal bool) + + // Let QueryProcessor identify itself. What if it is a spy and we can't check that? + GetName() string } type NewShardData struct { @@ -211,21 +217,18 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo processor = engine.NewPassthroughEngine(response, maxDeleteResults) } else { if self.ShouldAggregateLocally(querySpec) { + fmt.Printf("creating a query engine\n") processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response) if err != nil { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} log.Error("Error while creating engine: %s", err) return } - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } + processor.SetShardInfo(int(self.Id()), self.IsLocal) } else { maxPointsToBufferBeforeSending := 1000 + fmt.Printf("creating a passthrough engine\n") processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) - if querySpec.IsExplainQuery() { - processor.SetShardInfo(int(self.Id()), self.IsLocal) - } } } shard, err := self.store.GetOrCreateShard(self.id) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 9cf224fb26..37f215087a 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -42,12 +42,13 @@ var ( // shorter constants for readability var ( - dropDatabase = protocol.Request_DROP_DATABASE - queryRequest = protocol.Request_QUERY - endStreamResponse = protocol.Response_END_STREAM - queryResponse = protocol.Response_QUERY - heartbeatResponse = protocol.Response_HEARTBEAT - write = protocol.Request_WRITE + dropDatabase = protocol.Request_DROP_DATABASE + queryRequest = protocol.Request_QUERY + endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY + heartbeatResponse = protocol.Response_HEARTBEAT + explainQueryResponse = protocol.Response_EXPLAIN_QUERY + write = protocol.Request_WRITE ) type SeriesWriter interface { @@ -77,7 +78,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC } func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) { - log.Debug("COORD: RunQuery: ", queryString) + log.Debug("COORD: RunQuery: %s", queryString) // don't let a panic pass beyond RunQuery defer recoverFunc(database, queryString) @@ -254,14 +255,17 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, go func() { for { - res := <-responseChan - if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse { + response := <-responseChan + + if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { writer.Close() seriesClosed <- true return } - if res.Series != nil && len(res.Series.Points) > 0 { - writer.Write(res.Series) + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + if response.Series != nil && len(response.Series.Points) > 0 { + writer.Write(response.Series) + } } } }() @@ -278,6 +282,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri responses := make([]chan *protocol.Response, 0) for _, shard := range shards { responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) + // We query shards for data and stream them to query processor go shard.Query(querySpec, responseChan) responses = append(responses, responseChan) } @@ -296,20 +301,27 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri break } - // if we don't have a processor, yield the point to the writer - if processor == nil { - log.Debug("WRITING: ", len(response.Series.Points)) - seriesWriter.Write(response.Series) - log.Debug("WRITING (done)") + if response.Series == nil || len(response.Series.Points) == 0 { + log.Debug("Series has no points, continue") continue } - // if the data wasn't aggregated at the shard level, aggregate - // the data here - if response.Series != nil { - log.Debug("YIELDING: ", len(response.Series.Points)) - processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) - } + // if we don't have a processor, yield the point to the writer + // this happens if shard took care of the query + // otherwise client will get points from passthrough engine + if processor != nil { + // if the data wasn't aggregated at the shard level, aggregate + // the data here + log.Debug("YIELDING: %d points", len(response.Series.Points)) + processor.YieldSeries(response.Series) + continue + } + + // If we have EXPLAIN query, we don't write actual points (of + // response.Type Query) to the client + if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { + seriesWriter.Write(response.Series) + } } log.Debug("DONE: shard: ", shards[i].String()) } diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 02d4325358..be8f1542b5 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -58,7 +58,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn // the query should always parse correctly since it was parsed at the originating server. queries, err := parser.ParseQuery(*request.Query) if err != nil || len(queries) < 1 { - log.Error("Erorr parsing query: ", err) + log.Error("Error parsing query: ", err) errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName) response := &protocol.Response{Type: &endStreamResponse, ErrorMessage: &errorMsg, RequestId: request.Id} self.WriteResponse(conn, response) @@ -80,6 +80,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn } shard := self.clusterConfig.GetLocalShardById(*request.ShardId) + querySpec := parser.NewQuerySpec(user, *request.Database, query) responseChan := make(chan *protocol.Response) diff --git a/src/datastore/leveldb_shard.go b/src/datastore/leveldb_shard.go index ee1135c3ec..03a1ab320f 100644 --- a/src/datastore/leveldb_shard.go +++ b/src/datastore/leveldb_shard.go @@ -281,8 +281,12 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser if len(seriesOutgoing.Points) >= self.pointBatchSize { for _, alias := range aliases { - _alias := alias - if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + series := &protocol.Series{ + Name: proto.String(alias), + Fields: fieldNames, + Points: seriesOutgoing.Points, + } + if !processor.YieldSeries(series) { shouldContinue = false } } @@ -296,9 +300,9 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser //Yield remaining data for _, alias := range aliases { - _alias := alias - log.Debug("Final Flush %s", _alias) - if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + log.Debug("Final Flush %s", alias) + series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points} + if !processor.YieldSeries(series) { log.Debug("Cancelled...") } } diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 45a00bdbf9..654f15429f 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -1,6 +1,6 @@ package engine -import ( +import ( "common" "fmt" "math" @@ -554,7 +554,7 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} if timestamps == nil { timestamps = make(map[interface{}]int64) self.timestamps[series] = timestamps - } + } if self.duration != nil { timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration } else { @@ -571,18 +571,19 @@ func (self *TimestampAggregator) AggregateSeries(series string, group interface{ } if len(s.Points) > 0 { if self.duration != nil { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() / *self.duration * *self.duration + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() / *self.duration * *self.duration } else { - timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() + timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() } } return nil } + /* //TODO: to be optimized func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { //log.Error("Timestamp: ", len(s.Points)) - for _, p := range s.Points { + for _, p := range s.Points { //log.Error("Point: ", p) self.AggregatePoint(series, group, p) } @@ -614,12 +615,9 @@ func NewTimestampAggregator(query *parser.SelectQuery, _ *parser.Value) (Aggrega var durationPtr *int64 - //log.Error("Duration: ", duration) - if duration != nil { newDuration := int64(*duration / time.Microsecond) durationPtr = &newDuration - // log.Error("Woohoo! ", durationPtr) } return &TimestampAggregator{ diff --git a/src/engine/engine.go b/src/engine/engine.go index 60d4c6f68b..803146faca 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -29,17 +29,26 @@ type QueryEngine struct { pointsRange map[string]*PointRange groupBy *parser.GroupByClause aggregateYield func(*protocol.Series) error + explain bool + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } +var ( + endStreamResponse = protocol.Response_END_STREAM + explainQueryResponse = protocol.Response_EXPLAIN_QUERY +) + const ( POINT_BATCH_SIZE = 64 ) -var ( - responseQuery = protocol.Response_QUERY - responseEndStream = protocol.Response_END_STREAM -) - // distribute query and possibly do the merge/join before yielding the points func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error { // see if this is a merge query @@ -69,11 +78,30 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo where: query.GetWhereCondition(), limiter: NewLimiter(limit), responseChan: responseChan, - seriesToPoints: make(map[string]*protocol.Series), + seriesToPoints: make(map[string]*protocol.Series), + // stats stuff + explain: query.IsExplainQuery(), + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query + } + + if queryEngine.explain { + queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) } yield := func(series *protocol.Series) error { - response := &protocol.Response{Type: &responseQuery, Series: series} + var response *protocol.Response + + if queryEngine.explain { + //TODO: We may not have to send points, just count them + queryEngine.pointsWritten += int64(len(series.Points)) + } + + response = &protocol.Response{Type: &queryResponse, Series: series} responseChan <- response return nil } @@ -93,6 +121,12 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo return queryEngine, nil } +// Shard will call this method for EXPLAIN query +func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) { + self.shardId = shardId + self.shardLocal = shardLocal +} + // Returns false if the query should be stopped (either because of limit or error) func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) { shouldContinue = true @@ -107,10 +141,20 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi } series.Points = append(series.Points, point) + if self.explain { + self.pointsRead++ + } + + fmt.Printf("self.seriesToPoints: %#v\n", self.seriesToPoints) return shouldContinue } -func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) { +func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) { + if self.explain { + self.pointsRead += int64(len(seriesIncoming.Points)) + } + seriesName := seriesIncoming.GetName() + self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields} return self.yieldSeriesData(seriesIncoming) } @@ -169,6 +213,8 @@ func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, er } func (self *QueryEngine) Close() { + fmt.Printf("Closing: %#v\n", self.seriesToPoints) + for _, series := range self.seriesToPoints { if len(series.Points) == 0 { continue @@ -182,6 +228,7 @@ func (self *QueryEngine) Close() { Name: series.Name, Fields: series.Fields, } + fmt.Printf("yielding empty series for %s\n", series.GetName()) err = self.yield(s) if err != nil { break @@ -191,7 +238,14 @@ func (self *QueryEngine) Close() { if self.isAggregateQuery { self.runAggregates() } - response := &protocol.Response{Type: &responseEndStream} + + if self.explain { + self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) + log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten) + + self.SendQueryStats() + } + response := &protocol.Response{Type: &endStreamResponse} if err != nil { message := err.Error() response.ErrorMessage = &message @@ -199,6 +253,40 @@ func (self *QueryEngine) Close() { self.responseChan <- response } +func (self *QueryEngine) SendQueryStats() { + timestamp := time.Now().UnixNano() / int64(time.Microsecond) + + runTime := self.runEndTime - self.runStartTime + points := []*protocol.Point{} + pointsRead := self.pointsRead + pointsWritten := self.pointsWritten + shardId := int64(self.shardId) + shardLocal := self.shardLocal + engineName := "QueryEngine" + + point := &protocol.Point{ + Values: []*protocol.FieldValue{ + &protocol.FieldValue{StringValue: &engineName}, + &protocol.FieldValue{Int64Value: &shardId}, + &protocol.FieldValue{BoolValue: &shardLocal}, + &protocol.FieldValue{DoubleValue: &runTime}, + &protocol.FieldValue{Int64Value: &pointsRead}, + &protocol.FieldValue{Int64Value: &pointsWritten}, + }, + Timestamp: ×tamp, + } + points = append(points, point) + + seriesName := "explain query" + series := &protocol.Series{ + Name: &seriesName, + Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"}, + Points: points, + } + response := &protocol.Response{Type: &explainQueryResponse, Series: series} + self.responseChan <- response +} + func containsArithmeticOperators(query *parser.SelectQuery) bool { for _, column := range query.GetColumnNames() { if column.Type == parser.ValueExpression { @@ -612,3 +700,7 @@ func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield return nil }) } + +func (self *QueryEngine) GetName() string { + return "QueryEngine" +} diff --git a/src/engine/limiter.go b/src/engine/limiter.go index f96d48cdf1..11f0fe83d7 100644 --- a/src/engine/limiter.go +++ b/src/engine/limiter.go @@ -1,7 +1,7 @@ package engine import ( - "protocol" + "protocol" ) type Limiter struct { @@ -18,10 +18,10 @@ func NewLimiter(limit int) *Limiter { } } -func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { +func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { if self.shouldLimit { - // if the limit is 0, stop returning any points - limit := self.limitForSeries(*series.Name) + // if the limit is 0, stop returning any points + limit := self.limitForSeries(*series.Name) defer func() { self.limits[*series.Name] = limit }() if limit == 0 { series.Points = nil @@ -33,7 +33,7 @@ func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { series.Points = series.Points[0:sliceTo] limit = 0 } - } + } } func (self *Limiter) hitLimit(seriesName string) bool { diff --git a/src/engine/list_series_engine.go b/src/engine/list_series_engine.go index 99b36afb6b..753fbd7faf 100644 --- a/src/engine/list_series_engine.go +++ b/src/engine/list_series_engine.go @@ -9,8 +9,7 @@ const ( ) var ( - queryResponse = protocol.Response_QUERY - endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY ) type ListSeriesEngine struct { @@ -42,7 +41,7 @@ func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []strin return true } -func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool { +func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool { if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE { self.responseChan <- self.response self.response = &protocol.Response{ @@ -50,7 +49,7 @@ func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []stri MultiSeries: make([]*protocol.Series, 0), } } - self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName}) + self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name}) return true } @@ -61,3 +60,11 @@ func (self *ListSeriesEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't work with this query +} + +func (self *ListSeriesEngine) GetName() string { + return "ListSeriesEngine" +} diff --git a/src/engine/merge.go b/src/engine/merge.go index 779f092139..6e8b17720e 100644 --- a/src/engine/merge.go +++ b/src/engine/merge.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "parser" "protocol" ) @@ -16,6 +17,7 @@ func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error) name := table1 + "_join_" + table2 return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error { + fmt.Printf("join series: %d\n", len(s.Points)) if *s.Name == table1 { lastPoint1 = s.Points[len(s.Points)-1] if lastFields1 == nil { @@ -269,6 +271,8 @@ func mergeYield(table1, table2 string, modifyValues bool, ascending bool, yield } return func(p *protocol.Series) error { + fmt.Printf("series: %v\n", len(p.Points)) + state.updateState(p) if err := state.flushIfNecessary(yield); err != nil { diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index d45b9ef57c..2576323080 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -12,6 +12,15 @@ type PassthroughEngine struct { response *protocol.Response maxPointsInResponse int limiter *Limiter + responseType *protocol.Response_Type + + // query statistics + runStartTime float64 + runEndTime float64 + pointsRead int64 + pointsWritten int64 + shardId int + shardLocal bool } func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine { @@ -19,14 +28,24 @@ func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInRespo } func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine { - return &PassthroughEngine{ + passthroughEngine := &PassthroughEngine{ responseChan: responseChan, maxPointsInResponse: maxPointsInResponse, limiter: NewLimiter(limit), + responseType: &queryResponse, + runStartTime: 0, + runEndTime: 0, + pointsRead: 0, + pointsWritten: 0, + shardId: 0, + shardLocal: false, //that really doesn't matter if it is not EXPLAIN query } + + return passthroughEngine } func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { + self.responseType = &queryResponse series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames} self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) == 0 { @@ -35,19 +54,19 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } - } else if self.response.Series.Name != seriesName { + } else if *self.response.Series.Name != *seriesName { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: series, } } else { @@ -56,54 +75,45 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri return !self.limiter.hitLimit(*seriesName) } -func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool { +func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool { log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) -/* - seriesCopy := &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} - for _, point := range seriesIncoming.Points { - seriesCopy.Points = append(seriesCopy.Points, point) + if *seriesIncoming.Name == "explain query" { + self.responseType = &explainQueryResponse + log.Debug("Response Changed!") + } else { + self.responseType = &queryResponse } -*/ - //log.Debug("PT Copied %d %d", len(seriesIncoming.Points), POINT_BATCH_SIZE) + self.limiter.calculateLimitAndSlicePoints(seriesIncoming) if len(seriesIncoming.Points) == 0 { log.Error("Not sent == 0") return false - } + } - //log.Debug("PassthroughEngine", seriesCopy) - /* - self.response = &protocol.Response{ - Type: &queryResponse, - Series: seriesIncoming, - } - self.responseChan <- self.response - */ if self.response == nil { self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } - } else if *self.response.Series.Name != *seriesName { + } else if self.response.Series.GetName() != seriesIncoming.GetName() { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ - Type: &queryResponse, + Type: self.responseType, Series: seriesIncoming, } } else { self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...) } - return !self.limiter.hitLimit(*seriesName) + return !self.limiter.hitLimit(seriesIncoming.GetName()) //return true } - func (self *PassthroughEngine) Close() { if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { self.responseChan <- self.response @@ -111,3 +121,11 @@ func (self *PassthroughEngine) Close() { response := &protocol.Response{Type: &endStreamResponse} self.responseChan <- response } + +func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) { + //EXPLAIN doens't really work with this query (yet ?) +} + +func (self *PassthroughEngine) GetName() string { + return "PassthroughEngine" +} diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index 2f8a75a0f1..9d14697c27 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -298,6 +298,177 @@ func (self *IntegrationSuite) TestShortPasswords(c *C) { c.Assert(resp.StatusCode, Equals, http.StatusOK) } +func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3] + ], + "name": "test_explain_passthrough", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(2.0)) + c.Assert(series[0].Points[0][6], Equals, float64(2.0)) +} + +func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) { + points := []string{} + for i := 0; i < 101; i++ { + points = append(points, fmt.Sprintf(`["val1", %d]`, i)) + } + + data := fmt.Sprintf(` + [{ + "points": [%s], + "name": "test_explain_passthrough_limit", + "columns": ["val_1", "val_2"] + }]`, strings.Join(points, ",")) + + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough_limit where time > now() - 1h limit 1", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + + // we can read at most point-batch-size points, which is set to 100 + // by default + c.Assert(series[0].Points[0][5], Equals, float64(100.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithNonLocalAggregator(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_explain_non_local", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from test_explain_non_local where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithNonLocalAggregatorAndRegex(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_explain_non_local_regex", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_explain_non_local_regex.*/ where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithLocalAggregator(c *C) { + data := ` + [{ + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator", + "columns": ["val_1", "val_2"] + }]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from test_local_aggregator group by time(1h) where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(3.0)) + c.Assert(series[0].Points[0][6], Equals, float64(1.0)) +} + +func (self *IntegrationSuite) TestExplainsWithLocalAggregatorAndRegex(c *C) { + data := ` +[ + { + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator_regex_1", + "columns": ["val_1", "val_2"] + }, + { + "points": [ + ["val1", 2], + ["val1", 3], + ["val1", 4] + ], + "name": "test_local_aggregator_regex_2", + "columns": ["val_1", "val_2"] + } +]` + c.Assert(self.server.WriteData(data), IsNil) + bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_local_aggregator_regex.*/ group by time(1h) where time > now() - 1h", "m") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(bs, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "explain query") + c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, "QueryEngine") + c.Assert(series[0].Points[0][5], Equals, float64(6.0)) + c.Assert(series[0].Points[0][6], Equals, float64(2.0)) +} + func (self *IntegrationSuite) TestMedians(c *C) { for i := 0; i < 3; i++ { err := self.server.WriteData(fmt.Sprintf(` diff --git a/src/integration/test_config1.toml b/src/integration/test_config1.toml index 9ffd56e598..39fc7ae86f 100644 --- a/src/integration/test_config1.toml +++ b/src/integration/test_config1.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/1/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config2.toml b/src/integration/test_config2.toml index 7b4be21198..59d0052fa7 100644 --- a/src/integration/test_config2.toml +++ b/src/integration/test_config2.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/2/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config3.toml b/src/integration/test_config3.toml index b295ffce51..4f7fd822f2 100644 --- a/src/integration/test_config3.toml +++ b/src/integration/test_config3.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -level = "info" +level = "debug" file = "/tmp/influxdb/test/3/influxdb.log" # Configure the admin server diff --git a/src/parser/parser.go b/src/parser/parser.go index 065fa835f5..4b588673f6 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -150,6 +150,7 @@ type SelectQuery struct { IntoClause *IntoClause Limit int Ascending bool + Explain bool } type ListType int @@ -203,6 +204,10 @@ func (self *Query) IsListQuery() bool { return self.ListQuery != nil } +func (self *Query) IsExplainQuery() bool { + return self.SelectQuery != nil && self.SelectQuery.Explain +} + func (self *Query) IsListSeriesQuery() bool { return self.ListQuery != nil && self.ListQuery.Type == Series } @@ -219,6 +224,10 @@ func (self *SelectQuery) GetColumnNames() []*Value { return self.ColumnNames } +func (self *SelectQuery) IsExplainQuery() bool { + return self.Explain +} + func (self *SelectQuery) IsSinglePointQuery() bool { w := self.GetWhereCondition() if w == nil { @@ -617,6 +626,7 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro SelectDeleteCommonQuery: basicQuery, Limit: int(limit), Ascending: q.ascending != 0, + Explain: q.explain != 0, } // get the column names diff --git a/src/parser/parser_test.go b/src/parser/parser_test.go index e9112d7dfb..054b43a69e 100644 --- a/src/parser/parser_test.go +++ b/src/parser/parser_test.go @@ -31,6 +31,23 @@ func (self *QueryParserSuite) TestInvalidFromClause(c *C) { c.Assert(err, ErrorMatches, ".*\\$undefined.*") } +func (self *QueryParserSuite) TestInvalidExplainQueries(c *C) { + query := "explain select foo, baz group by time(1d)" + _, err := ParseQuery(query) + + c.Assert(err, NotNil) +} + +func (self *QueryParserSuite) TestExplainQueries(c *C) { + query := "explain select foo, bar from baz group by time(1d)" + queries, err := ParseQuery(query) + + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].SelectQuery, NotNil) + c.Assert(queries[0].SelectQuery.IsExplainQuery(), Equals, true) +} + func (self *QueryParserSuite) TestParseBasicSelectQuery(c *C) { for _, query := range []string{ "select value from t where c = '5';", diff --git a/src/parser/query.lex b/src/parser/query.lex index fbf5ba95ad..56790fce22 100644 --- a/src/parser/query.lex +++ b/src/parser/query.lex @@ -58,6 +58,7 @@ static int yycolumn = 1; "where" { BEGIN(INITIAL); return WHERE; } "as" { return AS; } "select" { return SELECT; } +"explain" { return EXPLAIN; } "delete" { return DELETE; } "drop series" { return DROP_SERIES; } "drop" { return DROP; } diff --git a/src/parser/query.yacc b/src/parser/query.yacc index f894fbc83c..cef7733e8f 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -74,7 +74,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %lex-param {void *scanner} // define types of tokens (terminals) -%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES +%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES EXPLAIN %token STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME INTO_NAME REGEX_OP %token NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION @@ -105,6 +105,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %type DROP_SERIES_QUERY %type SELECT_QUERY %type DROP_QUERY +%type EXPLAIN_QUERY // the initial token %start ALL_QUERIES @@ -176,6 +177,12 @@ QUERY: $$ = calloc(1, sizeof(query)); $$->list_continuous_queries_query = TRUE; } + | + EXPLAIN_QUERY + { + $$ = calloc(1, sizeof(query)); + $$->select_query = $1; + } DROP_QUERY: DROP CONTINUOUS_QUERY INT_VALUE @@ -200,6 +207,13 @@ DROP_SERIES_QUERY: $$->name = $2; } +EXPLAIN_QUERY: + EXPLAIN SELECT_QUERY + { + $$ = $2; + $$->explain = TRUE; + } + SELECT_QUERY: SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE { @@ -211,6 +225,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } | SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE @@ -223,6 +238,7 @@ SELECT_QUERY: $$->limit = $6.limit; $$->ascending = $6.ascending; $$->into_clause = $7; + $$->explain = FALSE; } LIMIT_AND_ORDER_CLAUSES: diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 113957e831..0c53c1a812 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -109,6 +109,13 @@ func (self *QuerySpec) IsSinglePointQuery() bool { return false } +func (self *QuerySpec) IsExplainQuery() bool { + if self.query.SelectQuery != nil { + return self.query.SelectQuery.IsExplainQuery() + } + return false +} + func (self *QuerySpec) SelectQuery() *SelectQuery { return self.query.SelectQuery } diff --git a/src/parser/query_types.h b/src/parser/query_types.h index c9d5c4775a..63c5665440 100644 --- a/src/parser/query_types.h +++ b/src/parser/query_types.h @@ -88,6 +88,7 @@ typedef struct { condition *where_condition; int limit; char ascending; + char explain; } select_query; typedef struct { @@ -124,4 +125,3 @@ void free_error (error *error); // this is the api that is used in GO query parse_query(char *const query_s); void close_query (query *q); - diff --git a/src/parser/test_memory_leaks.sh b/src/parser/test_memory_leaks.sh index b25c7aff64..9f1833b42c 100755 --- a/src/parser/test_memory_leaks.sh +++ b/src/parser/test_memory_leaks.sh @@ -8,6 +8,9 @@ int main(int argc, char **argv) { query q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time>now()-1d;"); close_query(&q); + q = parse_query("explain select users.events group_by user_email,time(1h) where time>now()-1d;"); + close_query(&q); + // test freeing on error q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time >> now()-1d;"); close_query(&q); diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index 2a0ce6c2c4..d4ae863e0d 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -57,6 +57,7 @@ message Response { // Access denied also serves as an end of stream response ACCESS_DENIED = 8; HEARTBEAT = 9; + EXPLAIN_QUERY = 10; } enum ErrorCode { REQUEST_TOO_LARGE = 1;