more fixes for issue #341

this patch uses a channel of response channels instead of slice of
response channels to create a pipeline instead of batches. In other
words before this patch we processed shardConcurrentLimit shards first,
then processed the next shardConcurrentLimit. With this patch we
constantly have shardConcurrentLimit in the pipeline, as soon as we're
done with one shard we start querying a new shard and so on. This
provides more parallelism and cleaner design.
pull/310/head
John Shahid 2014-04-03 13:44:58 -04:00
parent d39646a8d6
commit 1967629f16
1 changed files with 70 additions and 24 deletions

View File

@ -308,10 +308,13 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor,
writer SeriesWriter,
isExplainQuery bool,
channels []<-chan *protocol.Response) (err error) {
for _, responseChan := range channels {
for {
response := <-responseChan
errors chan<- error,
channels <-chan (<-chan *protocol.Response)) {
defer close(errors)
for responseChan := range channels {
for response := range responseChan {
//log.Debug("GOT RESPONSE: ", response.Type, response.Series)
log.Debug("GOT RESPONSE: ", response.Type)
@ -320,7 +323,9 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro
break
}
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
err := common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
log.Error("Error while executing query: %s", err)
errors <- err
return
}
@ -346,47 +351,88 @@ func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryPro
writer.Write(response.Series)
}
}
// once we're done with a response channel signal queryShards to
// start querying a new shard
errors <- nil
}
return
}
func (self *CoordinatorImpl) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData,
errors <-chan error,
responseChannels chan<- (<-chan *protocol.Response)) error {
defer close(responseChannels)
for i := 0; i < len(shards); i++ {
// readFromResposneChannels will insert an error if an error
// occured while reading the response. This should immediately
// stop reading from shards
err := <-errors
if err != nil {
return err
}
shard := shards[i]
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
// We query shards for data and stream them to query processor
log.Debug("QUERYING: shard: ", i, shard.String())
go shard.Query(querySpec, responseChan)
responseChannels <- responseChan
}
return nil
}
func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter)
if err != nil {
return err
}
defer func() {
if processor != nil {
processor.Close()
<-seriesClosed
} else {
seriesWriter.Close()
}
}()
shardConcurrentLimit := self.config.ConcurrentShardQueryLimit
if self.shouldQuerySequentially(shards, querySpec) {
log.Debug("Querying shards sequentially")
shardConcurrentLimit = 1
}
log.Debug("Shard concurrent limit: ", shardConcurrentLimit)
for i := 0; i < len(shards); i += shardConcurrentLimit {
responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit)
for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ {
shard := shards[i+j]
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
// We query shards for data and stream them to query processor
log.Debug("QUERYING: shard: ", i+j, shard.String())
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
errors := make(chan error, shardConcurrentLimit)
for i := 0; i < shardConcurrentLimit; i++ {
errors <- nil
}
responseChannels := make(chan (<-chan *protocol.Response), shardConcurrentLimit)
err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses)
if err != nil {
log.Error("Reading responses from channels returned an error: %s", err)
return err
go self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), errors, responseChannels)
err = self.queryShards(querySpec, shards, errors, responseChannels)
// make sure we read the rest of the errors and responses
for _err := range errors {
if err == nil {
err = _err
}
}
if processor != nil {
processor.Close()
<-seriesClosed
return err
for responsechan := range responseChannels {
for response := range responsechan {
if response.GetType() != endStreamResponse {
continue
}
if response.ErrorMessage != nil && err == nil {
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
}
break
}
}
seriesWriter.Close()
return err
}