Fix write buffer replays to properly play requests in order and not drop any mid-replay.
parent
115d5d16b8
commit
64383b547f
|
@ -22,6 +22,7 @@ type Writer interface {
|
|||
}
|
||||
|
||||
func NewWriteBuffer(writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer {
|
||||
log.Info("Initializing write buffer with buffer size of %d", bufferSize)
|
||||
buff := &WriteBuffer{
|
||||
writer: writer,
|
||||
wal: wal,
|
||||
|
@ -42,6 +43,7 @@ func (self *WriteBuffer) Write(request *protocol.Request) {
|
|||
case self.writes <- request:
|
||||
return
|
||||
default:
|
||||
log.Info("Write buffer full, pausing that shit")
|
||||
select {
|
||||
case self.stoppedWrites <- *request.RequestNumber:
|
||||
return
|
||||
|
@ -66,7 +68,7 @@ func (self *WriteBuffer) write(request *protocol.Request) {
|
|||
attempts := 0
|
||||
for {
|
||||
self.shardIds[*request.ShardId] = true
|
||||
requestNumber := request.GetRequestNumber()
|
||||
requestNumber := *request.RequestNumber
|
||||
err := self.writer.Write(request)
|
||||
if err == nil {
|
||||
self.wal.Commit(requestNumber, self.serverId)
|
||||
|
@ -83,24 +85,54 @@ func (self *WriteBuffer) write(request *protocol.Request) {
|
|||
|
||||
func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
|
||||
for {
|
||||
log.Info("REPLAY: Replaying dropped requests...")
|
||||
// empty out the buffer before the replay so new writes can buffer while we're replaying
|
||||
channelLen := len(self.writes)
|
||||
for i := 0; i < channelLen; i++ {
|
||||
<-self.writes
|
||||
var req *protocol.Request
|
||||
|
||||
// if req is nil, this is the first run through the replay. Start from the start of the write queue
|
||||
if req == nil {
|
||||
for i := 0; i < channelLen; i++ {
|
||||
r := <-self.writes
|
||||
if req == nil {
|
||||
req = r
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("REPLAY: Emptied out channel")
|
||||
shardIds := make([]uint32, 0)
|
||||
for shardId, _ := range self.shardIds {
|
||||
shardIds = append(shardIds, shardId)
|
||||
}
|
||||
self.wal.RecoverServerFromRequestNumber(missedRequest, shardIds, func(request *protocol.Request, shardId uint32) error {
|
||||
|
||||
log.Info("REPLAY: Shards: ", shardIds)
|
||||
self.wal.RecoverServerFromRequestNumber(*req.RequestNumber, shardIds, func(request *protocol.Request, shardId uint32) error {
|
||||
req = request
|
||||
request.ShardId = &shardId
|
||||
self.write(request)
|
||||
return nil
|
||||
})
|
||||
|
||||
log.Info("REPLAY: Emptying out reqeusts from buffer that we've already replayed")
|
||||
RequestLoop:
|
||||
for {
|
||||
select {
|
||||
case newReq := <-self.writes:
|
||||
if *newReq.RequestNumber == *req.RequestNumber {
|
||||
break RequestLoop
|
||||
}
|
||||
default:
|
||||
log.Error("REPLAY: Got to the end of the write buffer without getting to the last written request.")
|
||||
break RequestLoop
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("REPLAY: done.")
|
||||
|
||||
// now make sure that no new writes were dropped. If so, do the replay again from this place.
|
||||
select {
|
||||
case missedRequest = <-self.stoppedWrites:
|
||||
case <-self.stoppedWrites:
|
||||
log.Info("REPLAY: Buffer backed up while replaying, going again.")
|
||||
continue
|
||||
default:
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue