more refactoring and fix the case when the yield function errors out

pull/269/head
John Shahid 2014-02-17 13:51:45 -05:00
parent 52266f24d7
commit 5d3586de4b
3 changed files with 141 additions and 86 deletions

View File

@ -144,7 +144,7 @@ func (self *log) recover() error {
stopChan := make(chan struct{})
go func() {
self.replayFromFile(self.file, map[uint32]struct{}{}, 0, replayChan, stopChan)
self.replayFromFileLocation(self.file, map[uint32]struct{}{}, 0, replayChan, stopChan)
}()
for {
@ -291,43 +291,53 @@ func (self *log) dupLogFile() (*os.File, error) {
// replay requests starting at the given requestNumber and for the
// given shard ids. Return all requests if shardIds is empty
func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32) (chan *replayRequest, chan struct{}) {
stopChan := make(chan struct{})
// this channel needs to be buffered in case the last request in the
// log file caused an error in the yield function
stopChan := make(chan struct{}, 1)
replayChan := make(chan *replayRequest, 10)
go func() {
file, err := self.dupLogFile()
if err != nil {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
close(replayChan)
return
}
defer file.Close()
offset := self.state.Index.requestOffset(requestNumber)
_, err = file.Seek(int64(offset), os.SEEK_SET)
if err != nil {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
close(replayChan)
return
}
shardIdsSet := map[uint32]struct{}{}
for _, shardId := range shardIds {
shardIdsSet[shardId] = struct{}{}
}
self.replayFromFile(file, shardIdsSet, requestNumber, replayChan, stopChan)
self.replayFromFileLocation(file, shardIdsSet, requestNumber, replayChan, stopChan)
}()
return replayChan, stopChan
}
func (self *log) replayFromFile(file *os.File, shardIdsSet map[uint32]struct{}, requestNumber uint32, replayChan chan *replayRequest, stopChan chan struct{}) {
func (self *log) replayFromFileLocation(file *os.File,
shardIdsSet map[uint32]struct{},
requestNumber uint32,
replayChan chan *replayRequest,
stopChan chan struct{}) {
defer func() { close(replayChan) }()
for {
hdr := &entryHeader{}
_, err := hdr.Read(file)
if err == io.EOF {
close(replayChan)
return
}
if err != nil {
// TODO: the following line is all over the place. DRY
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
@ -337,19 +347,10 @@ func (self *log) replayFromFile(file *os.File, shardIdsSet map[uint32]struct{},
} else {
_, ok = shardIdsSet[hdr.shardId]
}
if !ok {
if !ok || hdr.requestNumber < requestNumber {
_, err = file.Seek(int64(hdr.length), os.SEEK_CUR)
if err != nil {
replayChan <- newErrorReplayRequest(err)
return
}
continue
}
if hdr.requestNumber < requestNumber {
_, err = file.Seek(int64(hdr.length), os.SEEK_CUR)
if err != nil {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
continue
@ -358,24 +359,37 @@ func (self *log) replayFromFile(file *os.File, shardIdsSet map[uint32]struct{},
bytes := make([]byte, hdr.length)
read, err := self.file.Read(bytes)
if err != nil {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
if uint32(read) != hdr.length {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
req := &protocol.Request{}
err = req.Decode(bytes)
if err != nil {
replayChan <- newErrorReplayRequest(err)
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
replayRequest := &replayRequest{hdr.requestNumber, req, hdr.shardId, nil}
if sendOrStop(replayRequest, replayChan, stopChan) {
return
}
replayChan <- &replayRequest{hdr.requestNumber, req, hdr.shardId, nil}
}
}
func sendOrStop(req *replayRequest, replayChan chan *replayRequest, stopChan chan struct{}) bool {
select {
case replayChan <- req:
case <-stopChan:
return true
}
return false
}
func (self *log) forceBookmark(shutdown bool) error {
confirmationChan := make(chan *confirmation)
self.bookmarkChan <- &bookmarkEvent{shutdown, confirmationChan}
@ -411,3 +425,10 @@ func (self *log) internalBookmark() error {
self.state.RequestsSinceLastBookmark = 0
return nil
}
func (self *log) delete() {
filePath := path.Join(self.config.WalDir, fmt.Sprintf("bookmark.%d", self.suffix()))
os.Remove(filePath)
filePath = path.Join(self.config.WalDir, fmt.Sprintf("log.%d", self.suffix()))
os.Remove(filePath)
}

View File

@ -79,6 +79,68 @@ func (self *WAL) SetServerId(id uint32) {
self.serverId = id
}
// Marks a given request for a given server as committed
func (self *WAL) Commit(requestNumber uint32, server Server) error {
lastLogFile := self.logFiles[len(self.logFiles)-1]
lastLogFile.state.commitRequestNumber(server.Id(), requestNumber)
lowestCommitedRequestNumber := lastLogFile.state.LowestCommitedRequestNumber()
index := self.firstLogFile(lowestCommitedRequestNumber)
if index == 0 {
return nil
}
var unusedLogFiles []*log
unusedLogFiles, self.logFiles = self.logFiles[:index], self.logFiles[index:]
for _, logFile := range unusedLogFiles {
logFile.close()
logFile.delete()
}
return nil
}
// In the case where this server is running and another one in the cluster stops responding, at some point this server will have to just write
// requests to disk. When the downed server comes back up, it's this server's responsibility to send out any writes that were queued up. If
// the yield function returns nil then the request is committed.
func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
var firstLogFile int
outer:
for _, logFile := range self.logFiles[firstLogFile:] {
ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber)
for {
x := <-ch
if x == nil {
continue outer
}
if x.err != nil {
return x.err
}
if err := yield(x.request, x.shardId); err != nil {
stopChan <- struct{}{}
return err
}
}
close(stopChan)
}
return nil
}
func (self *WAL) Close() error {
for _, l := range self.logFiles {
if err := l.close(); err != nil {
return err
}
}
return nil
}
// PRIVATE functions
// creates a new log file using the next suffix and initializes its
// state with the state of the last log file
func (self *WAL) createNewLog() (*log, error) {
self.nextLogFileSuffix++
logFileName := path.Join(self.config.WalDir, fmt.Sprintf("log.%d", self.nextLogFileSuffix))
@ -126,30 +188,7 @@ func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Sh
return requestNumber, nil
}
// Marks a given request for a given server as committed
func (self *WAL) Commit(requestNumber uint32, server Server) error {
lastLogFile := self.logFiles[len(self.logFiles)-1]
lastLogFile.state.commitRequestNumber(server.Id(), requestNumber)
lowestCommitedRequestNumber := lastLogFile.state.LowestCommitedRequestNumber()
index := self.firstLogFile(lowestCommitedRequestNumber)
if index == 0 {
return nil
}
var unusedLogFiles []*log
unusedLogFiles, self.logFiles = self.logFiles[:index], self.logFiles[index:]
for _, logFile := range unusedLogFiles {
logFile.close()
filePath := path.Join(self.config.WalDir, fmt.Sprintf("bookmark.%d", logFile.suffix()))
os.Remove(filePath)
filePath = path.Join(self.config.WalDir, fmt.Sprintf("log.%d", logFile.suffix()))
os.Remove(filePath)
}
return nil
}
func (self *WAL) getFirstLogFile(requestNumber uint32) func(int) bool {
func (self *WAL) doesLogFileContainRequest(requestNumber uint32) func(int) bool {
return func(i int) bool {
if self.logFiles[i].firstRequestNumber() > requestNumber {
return true
@ -166,42 +205,5 @@ func (self *WAL) firstLogFile(requestNumber uint32) int {
} else if requestNumber <= self.logFiles[0].firstRequestNumber() {
return 0
}
return sort.Search(lengthLogFiles, self.getFirstLogFile(requestNumber)) - 1
}
// In the case where this server is running and another one in the cluster stops responding, at some point this server will have to just write
// requests to disk. When the downed server comes back up, it's this server's responsibility to send out any writes that were queued up. If
// the yield function returns nil then the request is committed.
func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
var firstLogFile int
outer:
for _, logFile := range self.logFiles[firstLogFile:] {
ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber)
for {
x := <-ch
if x == nil {
continue outer
}
if x.err != nil {
return x.err
}
if err := yield(x.request, x.shardId); err != nil {
stopChan <- struct{}{}
return err
}
}
}
return nil
}
func (self *WAL) Close() error {
for _, l := range self.logFiles {
if err := l.close(); err != nil {
return err
}
}
return nil
return sort.Search(lengthLogFiles, self.doesLogFileContainRequest(requestNumber)) - 1
}

View File

@ -191,7 +191,7 @@ func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) {
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1))
// close andn reopen the wal
// close and reopen the wal
wal.Close()
wal, err = NewWAL(wal.config)
c.Assert(err, IsNil)
@ -252,6 +252,38 @@ func (_ *WalSuite) TestReplay(c *C) {
c.Assert(err, IsNil)
}
func (_ *WalSuite) TestErrorInReplay(c *C) {
wal := newWal(c)
request := generateRequest(1)
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1))
err = wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
return fmt.Errorf("end replay")
})
c.Assert(err, NotNil)
}
func (_ *WalSuite) TestErrorInReplayWithManyRequests(c *C) {
wal := newWal(c)
for i := 1; i <= 100; i++ {
request := generateRequest(i)
_, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
}
count := 0
err := wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
count++
if count > 50 {
return fmt.Errorf("end replay")
}
return nil
})
c.Assert(err, NotNil)
}
func (_ *WalSuite) TestIndexAfterRecovery(c *C) {
wal := newWal(c)
for i := 0; i < 1500; i++ {