diff --git a/src/wal/index.go b/src/wal/index.go index 3e543d9355..c3e61adb85 100644 --- a/src/wal/index.go +++ b/src/wal/index.go @@ -26,21 +26,17 @@ func (self *index) addEntry(startRequestNumber, size uint32, currentOffset uint6 self.CurrentOffset = currentOffset } -func (self *index) findOffsetBlock(requestNumber uint32) func(int) bool { +func (self *index) findOffsetBlock(order RequestNumberOrder, requestNumber uint32) func(int) bool { return func(i int) bool { // The returned function must satisfy `f(i) => f(i+1)`, meaning if // for index i f returns true, then it must return true for every // index greater than i. sort.Search will return the smallest // index satisfying f - if self.Entries[i].StartRequestNumber > requestNumber { - return true - } - - return false + return order.isAfter(self.Entries[i].StartRequestNumber, requestNumber) } } -func (self *index) requestOffset(requestNumber uint32) uint64 { +func (self *index) requestOffset(order RequestNumberOrder, requestNumber uint32) uint64 { numberOfEntries := len(self.Entries) if numberOfEntries == 0 { logger.Info("no index entries, assuming beginning of the file") @@ -48,15 +44,15 @@ func (self *index) requestOffset(requestNumber uint32) uint64 { } firstEntry := self.Entries[0] - if requestNumber < firstEntry.StartRequestNumber { + if order.isBeforeOrEqual(requestNumber, firstEntry.StartRequestNumber) { return firstEntry.StartOffset } lastEntry := self.Entries[numberOfEntries-1] - if requestNumber >= lastEntry.StartRequestNumber { + if order.isAfterOrEqual(requestNumber, lastEntry.StartRequestNumber) { return lastEntry.StartOffset } - index := sort.Search(numberOfEntries, self.findOffsetBlock(requestNumber)) + index := sort.Search(numberOfEntries, self.findOffsetBlock(order, requestNumber)) return self.Entries[index-1].StartOffset } diff --git a/src/wal/log.go b/src/wal/log.go index f7d687a3c2..308056eff1 100644 --- a/src/wal/log.go +++ b/src/wal/log.go @@ -104,6 +104,7 @@ func (self *log) recover() error { if err != nil { return err } + defer bookmark.Close() if err := self.state.read(bookmark); err != nil { return err } @@ -123,7 +124,7 @@ func (self *log) recover() error { stopChan := make(chan struct{}) go func() { - self.replayFromFileLocation(self.file, map[uint32]struct{}{}, 0, replayChan, stopChan) + self.replayFromFileLocation(self.file, map[uint32]struct{}{}, replayChan, stopChan) }() for { @@ -230,7 +231,7 @@ func (self *log) dupLogFile() (*os.File, error) { // replay requests starting at the given requestNumber and for the // given shard ids. Return all requests if shardIds is empty -func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32) (chan *replayRequest, chan struct{}) { +func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32, order RequestNumberOrder) (chan *replayRequest, chan struct{}) { // this channel needs to be buffered in case the last request in the // log file caused an error in the yield function stopChan := make(chan struct{}, 1) @@ -244,7 +245,7 @@ func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32 return } defer file.Close() - offset := self.state.Index.requestOffset(requestNumber) + offset := self.state.Index.requestOffset(order, requestNumber) logger.Debug("Replaying from file offset %d", offset) _, err = file.Seek(int64(offset), os.SEEK_SET) if err != nil { @@ -256,24 +257,61 @@ func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32 for _, shardId := range shardIds { shardIdsSet[shardId] = struct{}{} } - self.replayFromFileLocation(file, shardIdsSet, requestNumber, replayChan, stopChan) + if err := self.skipToRequest(file, requestNumber, order); err != nil { + sendOrStop(newErrorReplayRequest(err), replayChan, stopChan) + return + } + self.replayFromFileLocation(file, shardIdsSet, replayChan, stopChan) }() return replayChan, stopChan } +func (self *log) getNextHeader(file *os.File) (int, *entryHeader, error) { + hdr := &entryHeader{} + numberOfBytes, err := hdr.Read(file) + if err == io.EOF { + return 0, nil, nil + } + return numberOfBytes, hdr, err +} + +func (self *log) skipRequest(file *os.File, hdr *entryHeader) (err error) { + _, err = file.Seek(int64(hdr.length), os.SEEK_CUR) + return +} + +func (self *log) skipToRequest(file *os.File, requestNumber uint32, order RequestNumberOrder) error { + for { + n, hdr, err := self.getNextHeader(file) + if n == 0 { + // EOF + return nil + } + if err != nil { + return err + } + if order.isBefore(hdr.requestNumber, requestNumber) { + if err := self.skipRequest(file, hdr); err != nil { + return err + } + continue + } + // seek back to the beginning of the request header + _, err = file.Seek(int64(-n), os.SEEK_CUR) + return err + } +} + func (self *log) replayFromFileLocation(file *os.File, shardIdsSet map[uint32]struct{}, - requestNumber uint32, replayChan chan *replayRequest, stopChan chan struct{}) { defer func() { close(replayChan) }() for { - hdr := &entryHeader{} - numberOfBytes, err := hdr.Read(file) - - if err == io.EOF { - return + numberOfBytes, hdr, err := self.getNextHeader(file) + if numberOfBytes == 0 { + break } if err != nil { @@ -288,8 +326,8 @@ func (self *log) replayFromFileLocation(file *os.File, } else { _, ok = shardIdsSet[hdr.shardId] } - if !ok || hdr.requestNumber < requestNumber { - _, err = file.Seek(int64(hdr.length), os.SEEK_CUR) + if !ok { + err = self.skipRequest(file, hdr) if err != nil { sendOrStop(newErrorReplayRequest(err), replayChan, stopChan) return diff --git a/src/wal/request_number_order.go b/src/wal/request_number_order.go new file mode 100644 index 0000000000..3e9116f5f3 --- /dev/null +++ b/src/wal/request_number_order.go @@ -0,0 +1,8 @@ +package wal + +type RequestNumberOrder interface { + isAfter(uint32, uint32) bool + isAfterOrEqual(uint32, uint32) bool + isBefore(uint32, uint32) bool + isBeforeOrEqual(uint32, uint32) bool +} diff --git a/src/wal/sortable_log_slice.go b/src/wal/sortable_log_slice.go index cddf23b508..def72ca5b4 100644 --- a/src/wal/sortable_log_slice.go +++ b/src/wal/sortable_log_slice.go @@ -1,15 +1,20 @@ package wal -type sortableLogSlice []*log +type sortableLogSlice struct { + logFiles []*log + order RequestNumberOrder +} func (self sortableLogSlice) Len() int { - return len(self) + return len(self.logFiles) } func (self sortableLogSlice) Less(i, j int) bool { - return self[i].firstRequestNumber() < self[j].firstRequestNumber() + left := self.logFiles[i].firstRequestNumber() + right := self.logFiles[j].firstRequestNumber() + return self.order.isBefore(left, right) } func (self sortableLogSlice) Swap(i, j int) { - self[i], self[j] = self[j], self[i] + self.logFiles[i], self.logFiles[j] = self.logFiles[j], self.logFiles[i] } diff --git a/src/wal/state.go b/src/wal/state.go index 1a8fd798fd..ec1ef6e83c 100644 --- a/src/wal/state.go +++ b/src/wal/state.go @@ -21,11 +21,37 @@ type state struct { FileOffset int64 // the file offset at which this bookmark was created Index *index TotalNumberOfRequests int + FirstRequestNumber uint32 LargestRequestNumber uint32 ShardLastSequenceNumber map[uint32]uint64 ServerLastRequestNumber map[uint32]uint32 } +func (self *state) isAfter(left, right uint32) bool { + if left == right { + return false + } + if left >= self.FirstRequestNumber && right >= self.FirstRequestNumber { + return left > right + } + if left <= self.LargestRequestNumber && right <= self.LargestRequestNumber { + return left > right + } + return left <= self.LargestRequestNumber +} + +func (self *state) isAfterOrEqual(left, right uint32) bool { + return left == right || self.isAfter(left, right) +} + +func (self *state) isBefore(left, right uint32) bool { + return !self.isAfterOrEqual(left, right) +} + +func (self *state) isBeforeOrEqual(left, right uint32) bool { + return !self.isAfter(left, right) +} + func newState() *state { return &state{ Version: CURRENT_VERSION, @@ -64,6 +90,7 @@ func (self *state) getNextRequestNumber() uint32 { } func (self *state) continueFromState(state *state) { + self.FirstRequestNumber = state.FirstRequestNumber self.LargestRequestNumber = state.LargestRequestNumber self.ShardLastSequenceNumber = state.ShardLastSequenceNumber self.ServerLastRequestNumber = state.ServerLastRequestNumber diff --git a/src/wal/wal.go b/src/wal/wal.go index 40ef5b8c3f..f68c767cf2 100644 --- a/src/wal/wal.go +++ b/src/wal/wal.go @@ -66,8 +66,6 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) { } // sort the logfiles by the first request number in the log - sort.Sort(sortableLogSlice(logFiles)) - wal := &WAL{ config: config, logFiles: logFiles, @@ -79,6 +77,9 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) { // if we don't have any log files open yet, open a new one if len(logFiles) == 0 { _, err = wal.createNewLog() + } else { + state := logFiles[len(logFiles)-1].state + sort.Sort(sortableLogSlice{logFiles, state}) } go wal.processEntries() @@ -113,11 +114,12 @@ func (self *WAL) RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error { var firstLogFile int + state := self.logFiles[len(self.logFiles)-1].state outer: for _, logFile := range self.logFiles[firstLogFile:] { logger.Info("Replaying from %s", logFile.file.Name()) count := 0 - ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber) + ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber, state) for { x := <-ch if x == nil { @@ -210,6 +212,7 @@ func (self *WAL) processCommitEntry(e *commitEntry) { logFile.close() logFile.delete() } + lastLogFile.state.FirstRequestNumber = self.logFiles[0].firstRequestNumber() e.confirmation <- &confirmation{0, nil} } @@ -250,9 +253,9 @@ func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Sh return confirmation.requestNumber, confirmation.err } -func (self *WAL) doesLogFileContainRequest(requestNumber uint32) func(int) bool { +func (self *WAL) doesLogFileContainRequest(order RequestNumberOrder, requestNumber uint32) func(int) bool { return func(i int) bool { - if self.logFiles[i].firstRequestNumber() > requestNumber { + if order.isAfter(self.logFiles[i].firstRequestNumber(), requestNumber) { return true } return false @@ -262,12 +265,16 @@ func (self *WAL) doesLogFileContainRequest(requestNumber uint32) func(int) bool // returns the first log file that contains the given request number func (self *WAL) firstLogFile(requestNumber uint32) int { lengthLogFiles := len(self.logFiles) - if requestNumber >= self.logFiles[lengthLogFiles-1].firstRequestNumber() { + + lastLogFile := self.logFiles[len(self.logFiles)-1] + state := lastLogFile.state + + if state.isAfterOrEqual(requestNumber, lastLogFile.firstRequestNumber()) { return lengthLogFiles - 1 - } else if requestNumber <= self.logFiles[0].firstRequestNumber() { + } else if state.isAfterOrEqual(self.logFiles[0].firstRequestNumber(), requestNumber) { return 0 } - return sort.Search(lengthLogFiles, self.doesLogFileContainRequest(requestNumber)) - 1 + return sort.Search(lengthLogFiles, self.doesLogFileContainRequest(state, requestNumber)) - 1 } func (self *WAL) shouldRotateTheLogFile() bool { @@ -281,6 +288,7 @@ func (self *WAL) rotateTheLogFile() error { } lastLogFile := self.logFiles[len(self.logFiles)-1] + lastLogFile.state.FirstRequestNumber = self.logFiles[0].firstRequestNumber() err := lastLogFile.forceBookmark() if err != nil { return err diff --git a/src/wal/wal_test.go b/src/wal/wal_test.go index 3e21b5fbf2..2019573015 100644 --- a/src/wal/wal_test.go +++ b/src/wal/wal_test.go @@ -6,6 +6,7 @@ import ( "configuration" "fmt" . "launchpad.net/gocheck" + "math" "net/http" _ "net/http/pprof" "os" @@ -253,6 +254,76 @@ func (_ *WalSuite) TestReplay(c *C) { c.Assert(err, IsNil) } +// TODO: test roll over with multiple log files (this will test +// sorting of the log files) +func (_ *WalSuite) TestRequestNumberRollOver(c *C) { + wal := newWal(c) + firstRequestNumber := uint32(math.MaxUint32 - 10) + wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber + var i uint32 + for i = 0; i < 20; i++ { + req := generateRequest(2) + n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(n, Equals, firstRequestNumber+i+1) + } + wal.Close() + requests := []*protocol.Request{} + wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + requests = append(requests, req) + return nil + }) + c.Assert(requests, HasLen, 20) +} + +func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) { + wal := newWal(c) + firstRequestNumber := uint32(math.MaxUint32 - 5000) + wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber + var i uint32 + for i = 0; i < 20000; i++ { + req := generateRequest(2) + n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(n, Equals, firstRequestNumber+i+1) + } + wal.Close() + requests := []*protocol.Request{} + wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + requests = append(requests, req) + return nil + }) + c.Assert(len(requests), Equals, 20000) + wal, err := NewWAL(wal.config) + c.Assert(err, IsNil) + requests = []*protocol.Request{} + wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + requests = append(requests, req) + return nil + }) + c.Assert(len(requests), Equals, 20000) +} + +func (_ *WalSuite) TestRequestNumberRollOverAndIndexing(c *C) { + wal := newWal(c) + firstRequestNumber := uint32(math.MaxUint32 - 5000) + wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber + var i uint32 + for i = 0; i < 20000; i++ { + req := generateRequest(2) + n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(n, Equals, firstRequestNumber+i+1) + } + wal.Close() + requests := []*protocol.Request{} + wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + requests = append(requests, req) + return nil + }) + c.Assert(len(requests), Equals, 15000) +} + func (_ *WalSuite) TestRecoveryFromCrash(c *C) { wal := newWal(c) req := generateRequest(2) @@ -376,10 +447,11 @@ func (_ *WalSuite) TestIndex(c *C) { } c.Assert(wal.logFiles[0].state.Index.Entries, HasLen, 3) - requestOffset := wal.logFiles[0].state.Index.requestOffset(2001) + state := wal.logFiles[len(wal.logFiles)-1].state + requestOffset := wal.logFiles[0].state.Index.requestOffset(state, 2001) c.Assert(requestOffset > 0, Equals, true) // request 2000 should be in the second block not the third block - c.Assert(requestOffset > wal.logFiles[0].state.Index.requestOffset(2000), Equals, true) + c.Assert(requestOffset > wal.logFiles[0].state.Index.requestOffset(state, 2000), Equals, true) } func (_ *WalSuite) TestSequenceNumberAssignment(c *C) {