Merge pull request #357 from influxdb/fix-340-out-of-order-commits
Fix 340 out of order commitspull/293/merge
commit
c056cf5f47
|
@ -210,7 +210,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
|
|||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
server.Connect()
|
||||
}
|
||||
server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
server.SetWriteBuffer(NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
server.StartHeartbeat()
|
||||
} else if !self.addedLocalServer {
|
||||
log.Info("Added the local server")
|
||||
|
@ -503,7 +503,7 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
if server.connection == nil {
|
||||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
|
||||
server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
server.SetWriteBuffer(NewWriteBuffer(fmt.Sprintf("server: %d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
server.Connect()
|
||||
server.StartHeartbeat()
|
||||
}
|
||||
|
@ -911,7 +911,7 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32)
|
|||
}
|
||||
|
||||
func (self *ClusterConfiguration) RecoverFromWAL() error {
|
||||
self.shardStore.SetWriteBuffer(NewWriteBuffer(self.shardStore, self.wal, self.LocalServerId, self.config.LocalStoreWriteBufferSize))
|
||||
self.shardStore.SetWriteBuffer(NewWriteBuffer("local", self.shardStore, self.wal, self.LocalServerId, self.config.LocalStoreWriteBufferSize))
|
||||
var waitForAll sync.WaitGroup
|
||||
for _, server := range self.servers {
|
||||
waitForAll.Add(1)
|
||||
|
|
|
@ -15,14 +15,15 @@ type WriteBuffer struct {
|
|||
stoppedWrites chan uint32
|
||||
bufferSize int
|
||||
shardIds map[uint32]bool
|
||||
writerInfo string
|
||||
}
|
||||
|
||||
type Writer interface {
|
||||
Write(request *protocol.Request) error
|
||||
}
|
||||
|
||||
func NewWriteBuffer(writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer {
|
||||
log.Info("Initializing write buffer with buffer size of %d", bufferSize)
|
||||
func NewWriteBuffer(writerInfo string, writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer {
|
||||
log.Info("%s: Initializing write buffer with buffer size of %d", writerInfo, bufferSize)
|
||||
buff := &WriteBuffer{
|
||||
writer: writer,
|
||||
wal: wal,
|
||||
|
@ -31,6 +32,7 @@ func NewWriteBuffer(writer Writer, wal WAL, serverId uint32, bufferSize int) *Wr
|
|||
stoppedWrites: make(chan uint32, 1),
|
||||
bufferSize: bufferSize,
|
||||
shardIds: make(map[uint32]bool),
|
||||
writerInfo: writerInfo,
|
||||
}
|
||||
go buff.handleWrites()
|
||||
return buff
|
||||
|
@ -74,7 +76,7 @@ func (self *WriteBuffer) write(request *protocol.Request) {
|
|||
return
|
||||
}
|
||||
if attempts%100 == 0 {
|
||||
log.Error("WriteBuffer: error on write to server %d: %s", self.serverId, err)
|
||||
log.Error("%s: WriteBuffer: error on write to server %d: %s", self.writerInfo, self.serverId, err)
|
||||
}
|
||||
attempts += 1
|
||||
// backoff happens in the writer, just sleep for a small fixed amount of time before retrying
|
||||
|
@ -83,11 +85,11 @@ func (self *WriteBuffer) write(request *protocol.Request) {
|
|||
}
|
||||
|
||||
func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
|
||||
var req *protocol.Request
|
||||
for {
|
||||
log.Info("REPLAY: Replaying dropped requests...")
|
||||
log.Info("%s: REPLAY: Replaying dropped requests...", self.writerInfo)
|
||||
// empty out the buffer before the replay so new writes can buffer while we're replaying
|
||||
channelLen := len(self.writes)
|
||||
var req *protocol.Request
|
||||
|
||||
// if req is nil, this is the first run through the replay. Start from the start of the write queue
|
||||
if req == nil {
|
||||
|
@ -99,24 +101,25 @@ func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
|
|||
}
|
||||
}
|
||||
if req == nil {
|
||||
log.Error("REPLAY: emptied channel, but no request set")
|
||||
log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo)
|
||||
return
|
||||
}
|
||||
log.Info("REPLAY: Emptied out channel")
|
||||
log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo)
|
||||
shardIds := make([]uint32, 0)
|
||||
for shardId, _ := range self.shardIds {
|
||||
shardIds = append(shardIds, shardId)
|
||||
}
|
||||
|
||||
log.Info("REPLAY: Shards: ", shardIds)
|
||||
log.Debug("%s: REPLAY: from request %d. Shards: ", self.writerInfo, req.GetRequestNumber(), shardIds)
|
||||
self.wal.RecoverServerFromRequestNumber(*req.RequestNumber, shardIds, func(request *protocol.Request, shardId uint32) error {
|
||||
log.Debug("%s: REPLAY: writing request number: %d", self.writerInfo, request.GetRequestNumber())
|
||||
req = request
|
||||
request.ShardId = &shardId
|
||||
self.write(request)
|
||||
return nil
|
||||
})
|
||||
|
||||
log.Info("REPLAY: Emptying out reqeusts from buffer that we've already replayed")
|
||||
log.Info("%s: REPLAY: Emptying out reqeusts from buffer that we've already replayed", self.writerInfo)
|
||||
RequestLoop:
|
||||
for {
|
||||
select {
|
||||
|
@ -125,17 +128,17 @@ func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
|
|||
break RequestLoop
|
||||
}
|
||||
default:
|
||||
log.Error("REPLAY: Got to the end of the write buffer without getting to the last written request.")
|
||||
log.Error("%s: REPLAY: Got to the end of the write buffer without getting to the last written request.", self.writerInfo)
|
||||
break RequestLoop
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("REPLAY: done.")
|
||||
log.Info("%s: REPLAY: done.", self.writerInfo)
|
||||
|
||||
// now make sure that no new writes were dropped. If so, do the replay again from this place.
|
||||
select {
|
||||
case <-self.stoppedWrites:
|
||||
log.Info("REPLAY: Buffer backed up while replaying, going again.")
|
||||
log.Info("%s: REPLAY: Buffer backed up while replaying, going again.", self.writerInfo)
|
||||
continue
|
||||
default:
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue