simplify the recovery logic

pull/429/head
John Shahid 2014-04-11 17:25:00 -04:00
parent d3eba5f4d3
commit d199866ae4
1 changed files with 26 additions and 23 deletions

View File

@ -1,9 +1,10 @@
package cluster
import (
log "code.google.com/p/log4go"
"protocol"
"time"
log "code.google.com/p/log4go"
)
// Acts as a buffer for writes
@ -86,31 +87,33 @@ func (self *WriteBuffer) write(request *protocol.Request) {
func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
var req *protocol.Request
// empty out the buffer before the replay so new writes can buffer while we're replaying
channelLen := len(self.writes)
// This is the first run through the replay. Start from the start of the write queue
for i := 0; i < channelLen; i++ {
r := <-self.writes
if req == nil {
req = r
}
}
if req == nil {
log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo)
return
}
log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo)
shardIds := make([]uint32, 0)
for shardId, _ := range self.shardIds {
shardIds = append(shardIds, shardId)
}
// while we're behind keep replaying from WAL
for {
log.Info("%s: REPLAY: Replaying dropped requests...", self.writerInfo)
// empty out the buffer before the replay so new writes can buffer while we're replaying
channelLen := len(self.writes)
// if req is nil, this is the first run through the replay. Start from the start of the write queue
if req == nil {
for i := 0; i < channelLen; i++ {
r := <-self.writes
if req == nil {
req = r
}
}
}
if req == nil {
log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo)
return
}
log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo)
shardIds := make([]uint32, 0)
for shardId, _ := range self.shardIds {
shardIds = append(shardIds, shardId)
}
log.Debug("%s: REPLAY: from request %d. Shards: ", self.writerInfo, req.GetRequestNumber(), shardIds)
log.Debug("%s: REPLAY: from request %d. Shards: %v", self.writerInfo, req.GetRequestNumber(), shardIds)
self.wal.RecoverServerFromRequestNumber(*req.RequestNumber, shardIds, func(request *protocol.Request, shardId uint32) error {
log.Debug("%s: REPLAY: writing request number: %d", self.writerInfo, request.GetRequestNumber())
req = request