diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 1e9e800249..a94099217d 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -308,10 +308,13 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor, writer SeriesWriter, isExplainQuery bool, - channels []<-chan *protocol.Response) (err error) { - for _, responseChan := range channels { - for { - response := <-responseChan + errors chan<- error, + channels <-chan (<-chan *protocol.Response)) { + + defer close(errors) + + for responseChan := range channels { + for response := range responseChan { //log.Debug("GOT RESPONSE: ", response.Type, response.Series) log.Debug("GOT RESPONSE: ", response.Type) @@ -320,7 +323,9 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro break } - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + err := common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + log.Error("Error while executing query: %s", err) + errors <- err return } @@ -346,47 +351,88 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro writer.Write(response.Series) } } + + // once we're done with a response channel signal queryShards to + // start querying a new shard + errors <- nil } return } +func (self *CoordinatorImpl) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData, + errors <-chan error, + responseChannels chan<- (<-chan *protocol.Response)) error { + defer close(responseChannels) + + for i := 0; i < len(shards); i++ { + // readFromResposneChannels will insert an error if an error + // occured while reading the response. This should immediately + // stop reading from shards + err := <-errors + if err != nil { + return err + } + shard := shards[i] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + // We query shards for data and stream them to query processor + log.Debug("QUERYING: shard: ", i, shard.String()) + go shard.Query(querySpec, responseChan) + responseChannels <- responseChan + } + + return nil +} + func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) if err != nil { return err } + defer func() { + if processor != nil { + processor.Close() + <-seriesClosed + } else { + seriesWriter.Close() + } + }() + shardConcurrentLimit := self.config.ConcurrentShardQueryLimit if self.shouldQuerySequentially(shards, querySpec) { log.Debug("Querying shards sequentially") shardConcurrentLimit = 1 } log.Debug("Shard concurrent limit: ", shardConcurrentLimit) - for i := 0; i < len(shards); i += shardConcurrentLimit { - responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit) - for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ { - shard := shards[i+j] - responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) - // We query shards for data and stream them to query processor - log.Debug("QUERYING: shard: ", i+j, shard.String()) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } + errors := make(chan error, shardConcurrentLimit) + for i := 0; i < shardConcurrentLimit; i++ { + errors <- nil + } + responseChannels := make(chan (<-chan *protocol.Response), shardConcurrentLimit) - err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses) - if err != nil { - log.Error("Reading responses from channels returned an error: %s", err) - return err + go self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), errors, responseChannels) + + err = self.queryShards(querySpec, shards, errors, responseChannels) + + // make sure we read the rest of the errors and responses + for _err := range errors { + if err == nil { + err = _err } } - if processor != nil { - processor.Close() - <-seriesClosed - return err + for responsechan := range responseChannels { + for response := range responsechan { + if response.GetType() != endStreamResponse { + continue + } + if response.ErrorMessage != nil && err == nil { + err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + } + break + } } - seriesWriter.Close() return err }