From 1967629f16431340b97c6af19000fad803393609 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 3 Apr 2014 13:44:58 -0400 Subject: [PATCH] more fixes for issue #341 this patch uses a channel of response channels instead of slice of response channels to create a pipeline instead of batches. In other words before this patch we processed shardConcurrentLimit shards first, then processed the next shardConcurrentLimit. With this patch we constantly have shardConcurrentLimit in the pipeline, as soon as we're done with one shard we start querying a new shard and so on. This provides more parallelism and cleaner design. --- src/coordinator/coordinator.go | 94 +++++++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 1e9e800249..a94099217d 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -308,10 +308,13 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor, writer SeriesWriter, isExplainQuery bool, - channels []<-chan *protocol.Response) (err error) { - for _, responseChan := range channels { - for { - response := <-responseChan + errors chan<- error, + channels <-chan (<-chan *protocol.Response)) { + + defer close(errors) + + for responseChan := range channels { + for response := range responseChan { //log.Debug("GOT RESPONSE: ", response.Type, response.Series) log.Debug("GOT RESPONSE: ", response.Type) @@ -320,7 +323,9 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro break } - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + err := common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + log.Error("Error while executing query: %s", err) + errors <- err return } @@ -346,47 +351,88 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro writer.Write(response.Series) } } + + // once we're done with a response channel signal queryShards to + // start querying a new shard + errors <- nil } return } +func (self *CoordinatorImpl) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData, + errors <-chan error, + responseChannels chan<- (<-chan *protocol.Response)) error { + defer close(responseChannels) + + for i := 0; i < len(shards); i++ { + // readFromResposneChannels will insert an error if an error + // occured while reading the response. This should immediately + // stop reading from shards + err := <-errors + if err != nil { + return err + } + shard := shards[i] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + // We query shards for data and stream them to query processor + log.Debug("QUERYING: shard: ", i, shard.String()) + go shard.Query(querySpec, responseChan) + responseChannels <- responseChan + } + + return nil +} + func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) if err != nil { return err } + defer func() { + if processor != nil { + processor.Close() + <-seriesClosed + } else { + seriesWriter.Close() + } + }() + shardConcurrentLimit := self.config.ConcurrentShardQueryLimit if self.shouldQuerySequentially(shards, querySpec) { log.Debug("Querying shards sequentially") shardConcurrentLimit = 1 } log.Debug("Shard concurrent limit: ", shardConcurrentLimit) - for i := 0; i < len(shards); i += shardConcurrentLimit { - responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit) - for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ { - shard := shards[i+j] - responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) - // We query shards for data and stream them to query processor - log.Debug("QUERYING: shard: ", i+j, shard.String()) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } + errors := make(chan error, shardConcurrentLimit) + for i := 0; i < shardConcurrentLimit; i++ { + errors <- nil + } + responseChannels := make(chan (<-chan *protocol.Response), shardConcurrentLimit) - err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses) - if err != nil { - log.Error("Reading responses from channels returned an error: %s", err) - return err + go self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), errors, responseChannels) + + err = self.queryShards(querySpec, shards, errors, responseChannels) + + // make sure we read the rest of the errors and responses + for _err := range errors { + if err == nil { + err = _err } } - if processor != nil { - processor.Close() - <-seriesClosed - return err + for responsechan := range responseChannels { + for response := range responsechan { + if response.GetType() != endStreamResponse { + continue + } + if response.ErrorMessage != nil && err == nil { + err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + } + break + } } - seriesWriter.Close() return err }