diff --git a/src/cluster/write_buffer.go b/src/cluster/write_buffer.go index 025e3d710e..583ff5e625 100644 --- a/src/cluster/write_buffer.go +++ b/src/cluster/write_buffer.go @@ -1,9 +1,10 @@ package cluster import ( - log "code.google.com/p/log4go" "protocol" "time" + + log "code.google.com/p/log4go" ) // Acts as a buffer for writes @@ -86,31 +87,33 @@ func (self *WriteBuffer) write(request *protocol.Request) { func (self *WriteBuffer) replayAndRecover(missedRequest uint32) { var req *protocol.Request + + // empty out the buffer before the replay so new writes can buffer while we're replaying + channelLen := len(self.writes) + // This is the first run through the replay. Start from the start of the write queue + for i := 0; i < channelLen; i++ { + r := <-self.writes + if req == nil { + req = r + } + } + + if req == nil { + log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo) + return + } + log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo) + + shardIds := make([]uint32, 0) + for shardId, _ := range self.shardIds { + shardIds = append(shardIds, shardId) + } + + // while we're behind keep replaying from WAL for { log.Info("%s: REPLAY: Replaying dropped requests...", self.writerInfo) - // empty out the buffer before the replay so new writes can buffer while we're replaying - channelLen := len(self.writes) - // if req is nil, this is the first run through the replay. Start from the start of the write queue - if req == nil { - for i := 0; i < channelLen; i++ { - r := <-self.writes - if req == nil { - req = r - } - } - } - if req == nil { - log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo) - return - } - log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo) - shardIds := make([]uint32, 0) - for shardId, _ := range self.shardIds { - shardIds = append(shardIds, shardId) - } - - log.Debug("%s: REPLAY: from request %d. Shards: ", self.writerInfo, req.GetRequestNumber(), shardIds) + log.Debug("%s: REPLAY: from request %d. Shards: %v", self.writerInfo, req.GetRequestNumber(), shardIds) self.wal.RecoverServerFromRequestNumber(*req.RequestNumber, shardIds, func(request *protocol.Request, shardId uint32) error { log.Debug("%s: REPLAY: writing request number: %d", self.writerInfo, request.GetRequestNumber()) req = request