fix more bugs in the wal

pull/319/head
John Shahid 2014-03-06 15:29:55 -05:00
parent 66c0f919c9
commit 417c23f8d9
4 changed files with 45 additions and 10 deletions

View File

@ -6,6 +6,8 @@ import (
type closeEntry struct {
confirmation chan *confirmation
// this is used for testing only
shouldBookmark bool
}
type commitEntry struct {

View File

@ -122,6 +122,7 @@ func (self *log) dupAndReplayFromOffset(shardIds []uint32, offset int64, rn uint
if err = self.skip(file, offset, rn); err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
close(replayChan)
return
}
shardIdsSet := map[uint32]struct{}{}
for _, shardId := range shardIds {
@ -242,7 +243,7 @@ func (self *log) replayFromFileLocation(file *os.File,
}
if uint32(read) != hdr.length {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
sendOrStop(newErrorReplayRequest(fmt.Errorf("expected to read %d but got %d instead", hdr.length, read)), replayChan, stopChan)
return
}
req := &protocol.Request{}

View File

@ -121,7 +121,10 @@ func (self *WAL) Commit(requestNumber uint32, serverId uint32) error {
}
func (self *WAL) RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
requestNumber := self.state.ServerLastRequestNumber[serverId]
requestNumber, ok := self.state.ServerLastRequestNumber[serverId]
if !ok {
requestNumber = uint32(self.state.FirstSuffix)
}
return self.RecoverServerFromRequestNumber(requestNumber, shardIds, yield)
}
@ -138,8 +141,15 @@ func (self *WAL) isInRange(requestNumber uint32) bool {
// requests to disk. When the downed server comes back up, it's this server's responsibility to send out any writes that were queued up. If
// the yield function returns nil then the request is committed.
func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
// don't replay if we don't have any log files yet
if len(self.logFiles) == 0 {
return nil
}
firstIndex := 0
firstOffset := int64(-1)
// find the log file from which replay will start if the request
// number is in range, otherwise replay from all log files
if self.isInRange(requestNumber) {
for idx, logIndex := range self.logIndex {
logger.Debug("Trying to find request %d in %s", requestNumber, self.logFiles[idx].file.Name())
@ -188,20 +198,30 @@ outer:
}
func (self *WAL) Close() error {
return self.closeCommon(true)
}
func (self *WAL) closeWithoutBookmarking() error {
return self.closeCommon(false)
}
func (self *WAL) closeCommon(shouldBookmark bool) error {
confirmationChan := make(chan *confirmation)
self.entries <- &closeEntry{confirmationChan}
self.entries <- &closeEntry{confirmationChan, shouldBookmark}
confirmation := <-confirmationChan
return confirmation.err
}
func (self *WAL) processClose() error {
func (self *WAL) processClose(shouldBookmark bool) error {
for idx, logFile := range self.logFiles {
logFile.syncFile()
logFile.close()
self.logIndex[idx].syncFile()
self.logIndex[idx].close()
}
self.bookmark()
if shouldBookmark {
self.bookmark()
}
return nil
}
@ -216,7 +236,7 @@ func (self *WAL) processEntries() {
case *appendEntry:
self.processAppendEntry(x)
case *closeEntry:
x.confirmation <- &confirmation{0, self.processClose()}
x.confirmation <- &confirmation{0, self.processClose(x.shouldBookmark)}
logger.Info("Closing wal")
return
default:
@ -319,7 +339,7 @@ func (self *WAL) createNewLog(firstRequestNumber uint32) (*log, error) {
func (self *WAL) openLog(logFileName string) (*log, *index, error) {
logger.Info("Opening log file %s", logFileName)
logFile, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR, 0644)
logFile, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return nil, nil, err
}
@ -389,6 +409,7 @@ func (self *WAL) recover() error {
break
}
self.state.LargestRequestNumber = replayRequest.requestNumber
if err := replayRequest.err; err != nil {
return err
}
@ -403,6 +424,7 @@ func (self *WAL) recover() error {
self.requestsSinceLastIndex++
self.requestsSinceRotation++
logger.Debug("recovery requestsSinceLastIndex: %d, requestNumber: %d", self.requestsSinceLastIndex, replayRequest.request.GetRequestNumber())
logger.Debug("largestrequestnumber: %d\n", self.state.LargestRequestNumber)
if self.requestsSinceLastIndex < self.config.WalIndexAfterRequests {
continue

View File

@ -92,7 +92,7 @@ func (_ *WalSuite) TestRequestNumberAssignmentRecovery(c *C) {
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1))
c.Assert(wal.Close(), IsNil)
c.Assert(wal.closeWithoutBookmarking(), IsNil)
wal, err = NewWAL(wal.config)
c.Assert(err, IsNil)
request = generateRequest(2)
@ -101,7 +101,7 @@ func (_ *WalSuite) TestRequestNumberAssignmentRecovery(c *C) {
c.Assert(id, Equals, uint32(2))
// test recovery from wal replay
c.Assert(wal.Close(), IsNil)
c.Assert(wal.closeWithoutBookmarking(), IsNil)
wal, err = NewWAL(wal.config)
c.Assert(err, IsNil)
request = generateRequest(2)
@ -320,6 +320,16 @@ func (_ *WalSuite) TestRequestNumberRollOverAndIndexing(c *C) {
c.Assert(len(requests), Equals, 15000)
}
func (_ *WalSuite) TestRecoveryAfterStartup(c *C) {
wal := newWal(c)
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(requests, HasLen, 0)
}
func (_ *WalSuite) TestRecoveryFromCrash(c *C) {
wal := newWal(c)
req := generateRequest(2)
@ -415,7 +425,7 @@ func (_ *WalSuite) TestIndexAfterRecovery(c *C) {
c.Assert(id, Equals, uint32(i+1))
}
wal.Close()
wal.closeWithoutBookmarking()
wal, err := NewWAL(wal.config)
c.Assert(err, IsNil)