fix #285. fixing some problems with req num roll over

pull/290/head
John Shahid 2014-02-27 16:02:55 -05:00
parent 0fd2a4c267
commit df2056de8a
7 changed files with 190 additions and 36 deletions

View File

@ -26,21 +26,17 @@ func (self *index) addEntry(startRequestNumber, size uint32, currentOffset uint6
self.CurrentOffset = currentOffset
}
func (self *index) findOffsetBlock(requestNumber uint32) func(int) bool {
func (self *index) findOffsetBlock(order RequestNumberOrder, requestNumber uint32) func(int) bool {
return func(i int) bool {
// The returned function must satisfy `f(i) => f(i+1)`, meaning if
// for index i f returns true, then it must return true for every
// index greater than i. sort.Search will return the smallest
// index satisfying f
if self.Entries[i].StartRequestNumber > requestNumber {
return true
}
return false
return order.isAfter(self.Entries[i].StartRequestNumber, requestNumber)
}
}
func (self *index) requestOffset(requestNumber uint32) uint64 {
func (self *index) requestOffset(order RequestNumberOrder, requestNumber uint32) uint64 {
numberOfEntries := len(self.Entries)
if numberOfEntries == 0 {
logger.Info("no index entries, assuming beginning of the file")
@ -48,15 +44,15 @@ func (self *index) requestOffset(requestNumber uint32) uint64 {
}
firstEntry := self.Entries[0]
if requestNumber < firstEntry.StartRequestNumber {
if order.isBeforeOrEqual(requestNumber, firstEntry.StartRequestNumber) {
return firstEntry.StartOffset
}
lastEntry := self.Entries[numberOfEntries-1]
if requestNumber >= lastEntry.StartRequestNumber {
if order.isAfterOrEqual(requestNumber, lastEntry.StartRequestNumber) {
return lastEntry.StartOffset
}
index := sort.Search(numberOfEntries, self.findOffsetBlock(requestNumber))
index := sort.Search(numberOfEntries, self.findOffsetBlock(order, requestNumber))
return self.Entries[index-1].StartOffset
}

View File

@ -104,6 +104,7 @@ func (self *log) recover() error {
if err != nil {
return err
}
defer bookmark.Close()
if err := self.state.read(bookmark); err != nil {
return err
}
@ -123,7 +124,7 @@ func (self *log) recover() error {
stopChan := make(chan struct{})
go func() {
self.replayFromFileLocation(self.file, map[uint32]struct{}{}, 0, replayChan, stopChan)
self.replayFromFileLocation(self.file, map[uint32]struct{}{}, replayChan, stopChan)
}()
for {
@ -230,7 +231,7 @@ func (self *log) dupLogFile() (*os.File, error) {
// replay requests starting at the given requestNumber and for the
// given shard ids. Return all requests if shardIds is empty
func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32) (chan *replayRequest, chan struct{}) {
func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32, order RequestNumberOrder) (chan *replayRequest, chan struct{}) {
// this channel needs to be buffered in case the last request in the
// log file caused an error in the yield function
stopChan := make(chan struct{}, 1)
@ -244,7 +245,7 @@ func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32
return
}
defer file.Close()
offset := self.state.Index.requestOffset(requestNumber)
offset := self.state.Index.requestOffset(order, requestNumber)
logger.Debug("Replaying from file offset %d", offset)
_, err = file.Seek(int64(offset), os.SEEK_SET)
if err != nil {
@ -256,24 +257,61 @@ func (self *log) replayFromRequestNumber(shardIds []uint32, requestNumber uint32
for _, shardId := range shardIds {
shardIdsSet[shardId] = struct{}{}
}
self.replayFromFileLocation(file, shardIdsSet, requestNumber, replayChan, stopChan)
if err := self.skipToRequest(file, requestNumber, order); err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
self.replayFromFileLocation(file, shardIdsSet, replayChan, stopChan)
}()
return replayChan, stopChan
}
func (self *log) getNextHeader(file *os.File) (int, *entryHeader, error) {
hdr := &entryHeader{}
numberOfBytes, err := hdr.Read(file)
if err == io.EOF {
return 0, nil, nil
}
return numberOfBytes, hdr, err
}
func (self *log) skipRequest(file *os.File, hdr *entryHeader) (err error) {
_, err = file.Seek(int64(hdr.length), os.SEEK_CUR)
return
}
func (self *log) skipToRequest(file *os.File, requestNumber uint32, order RequestNumberOrder) error {
for {
n, hdr, err := self.getNextHeader(file)
if n == 0 {
// EOF
return nil
}
if err != nil {
return err
}
if order.isBefore(hdr.requestNumber, requestNumber) {
if err := self.skipRequest(file, hdr); err != nil {
return err
}
continue
}
// seek back to the beginning of the request header
_, err = file.Seek(int64(-n), os.SEEK_CUR)
return err
}
}
func (self *log) replayFromFileLocation(file *os.File,
shardIdsSet map[uint32]struct{},
requestNumber uint32,
replayChan chan *replayRequest,
stopChan chan struct{}) {
defer func() { close(replayChan) }()
for {
hdr := &entryHeader{}
numberOfBytes, err := hdr.Read(file)
if err == io.EOF {
return
numberOfBytes, hdr, err := self.getNextHeader(file)
if numberOfBytes == 0 {
break
}
if err != nil {
@ -288,8 +326,8 @@ func (self *log) replayFromFileLocation(file *os.File,
} else {
_, ok = shardIdsSet[hdr.shardId]
}
if !ok || hdr.requestNumber < requestNumber {
_, err = file.Seek(int64(hdr.length), os.SEEK_CUR)
if !ok {
err = self.skipRequest(file, hdr)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return

View File

@ -0,0 +1,8 @@
package wal
type RequestNumberOrder interface {
isAfter(uint32, uint32) bool
isAfterOrEqual(uint32, uint32) bool
isBefore(uint32, uint32) bool
isBeforeOrEqual(uint32, uint32) bool
}

View File

@ -1,15 +1,20 @@
package wal
type sortableLogSlice []*log
type sortableLogSlice struct {
logFiles []*log
order RequestNumberOrder
}
func (self sortableLogSlice) Len() int {
return len(self)
return len(self.logFiles)
}
func (self sortableLogSlice) Less(i, j int) bool {
return self[i].firstRequestNumber() < self[j].firstRequestNumber()
left := self.logFiles[i].firstRequestNumber()
right := self.logFiles[j].firstRequestNumber()
return self.order.isBefore(left, right)
}
func (self sortableLogSlice) Swap(i, j int) {
self[i], self[j] = self[j], self[i]
self.logFiles[i], self.logFiles[j] = self.logFiles[j], self.logFiles[i]
}

View File

@ -21,11 +21,37 @@ type state struct {
FileOffset int64 // the file offset at which this bookmark was created
Index *index
TotalNumberOfRequests int
FirstRequestNumber uint32
LargestRequestNumber uint32
ShardLastSequenceNumber map[uint32]uint64
ServerLastRequestNumber map[uint32]uint32
}
func (self *state) isAfter(left, right uint32) bool {
if left == right {
return false
}
if left >= self.FirstRequestNumber && right >= self.FirstRequestNumber {
return left > right
}
if left <= self.LargestRequestNumber && right <= self.LargestRequestNumber {
return left > right
}
return left <= self.LargestRequestNumber
}
func (self *state) isAfterOrEqual(left, right uint32) bool {
return left == right || self.isAfter(left, right)
}
func (self *state) isBefore(left, right uint32) bool {
return !self.isAfterOrEqual(left, right)
}
func (self *state) isBeforeOrEqual(left, right uint32) bool {
return !self.isAfter(left, right)
}
func newState() *state {
return &state{
Version: CURRENT_VERSION,
@ -64,6 +90,7 @@ func (self *state) getNextRequestNumber() uint32 {
}
func (self *state) continueFromState(state *state) {
self.FirstRequestNumber = state.FirstRequestNumber
self.LargestRequestNumber = state.LargestRequestNumber
self.ShardLastSequenceNumber = state.ShardLastSequenceNumber
self.ServerLastRequestNumber = state.ServerLastRequestNumber

View File

@ -66,8 +66,6 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) {
}
// sort the logfiles by the first request number in the log
sort.Sort(sortableLogSlice(logFiles))
wal := &WAL{
config: config,
logFiles: logFiles,
@ -79,6 +77,9 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) {
// if we don't have any log files open yet, open a new one
if len(logFiles) == 0 {
_, err = wal.createNewLog()
} else {
state := logFiles[len(logFiles)-1].state
sort.Sort(sortableLogSlice{logFiles, state})
}
go wal.processEntries()
@ -113,11 +114,12 @@ func (self *WAL) RecoverServerFromLastCommit(serverId uint32, shardIds []uint32,
func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
var firstLogFile int
state := self.logFiles[len(self.logFiles)-1].state
outer:
for _, logFile := range self.logFiles[firstLogFile:] {
logger.Info("Replaying from %s", logFile.file.Name())
count := 0
ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber)
ch, stopChan := logFile.replayFromRequestNumber(shardIds, requestNumber, state)
for {
x := <-ch
if x == nil {
@ -210,6 +212,7 @@ func (self *WAL) processCommitEntry(e *commitEntry) {
logFile.close()
logFile.delete()
}
lastLogFile.state.FirstRequestNumber = self.logFiles[0].firstRequestNumber()
e.confirmation <- &confirmation{0, nil}
}
@ -250,9 +253,9 @@ func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Sh
return confirmation.requestNumber, confirmation.err
}
func (self *WAL) doesLogFileContainRequest(requestNumber uint32) func(int) bool {
func (self *WAL) doesLogFileContainRequest(order RequestNumberOrder, requestNumber uint32) func(int) bool {
return func(i int) bool {
if self.logFiles[i].firstRequestNumber() > requestNumber {
if order.isAfter(self.logFiles[i].firstRequestNumber(), requestNumber) {
return true
}
return false
@ -262,12 +265,16 @@ func (self *WAL) doesLogFileContainRequest(requestNumber uint32) func(int) bool
// returns the first log file that contains the given request number
func (self *WAL) firstLogFile(requestNumber uint32) int {
lengthLogFiles := len(self.logFiles)
if requestNumber >= self.logFiles[lengthLogFiles-1].firstRequestNumber() {
lastLogFile := self.logFiles[len(self.logFiles)-1]
state := lastLogFile.state
if state.isAfterOrEqual(requestNumber, lastLogFile.firstRequestNumber()) {
return lengthLogFiles - 1
} else if requestNumber <= self.logFiles[0].firstRequestNumber() {
} else if state.isAfterOrEqual(self.logFiles[0].firstRequestNumber(), requestNumber) {
return 0
}
return sort.Search(lengthLogFiles, self.doesLogFileContainRequest(requestNumber)) - 1
return sort.Search(lengthLogFiles, self.doesLogFileContainRequest(state, requestNumber)) - 1
}
func (self *WAL) shouldRotateTheLogFile() bool {
@ -281,6 +288,7 @@ func (self *WAL) rotateTheLogFile() error {
}
lastLogFile := self.logFiles[len(self.logFiles)-1]
lastLogFile.state.FirstRequestNumber = self.logFiles[0].firstRequestNumber()
err := lastLogFile.forceBookmark()
if err != nil {
return err

View File

@ -6,6 +6,7 @@ import (
"configuration"
"fmt"
. "launchpad.net/gocheck"
"math"
"net/http"
_ "net/http/pprof"
"os"
@ -253,6 +254,76 @@ func (_ *WalSuite) TestReplay(c *C) {
c.Assert(err, IsNil)
}
// TODO: test roll over with multiple log files (this will test
// sorting of the log files)
func (_ *WalSuite) TestRequestNumberRollOver(c *C) {
wal := newWal(c)
firstRequestNumber := uint32(math.MaxUint32 - 10)
wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber
var i uint32
for i = 0; i < 20; i++ {
req := generateRequest(2)
n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(n, Equals, firstRequestNumber+i+1)
}
wal.Close()
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(requests, HasLen, 20)
}
func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) {
wal := newWal(c)
firstRequestNumber := uint32(math.MaxUint32 - 5000)
wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber
var i uint32
for i = 0; i < 20000; i++ {
req := generateRequest(2)
n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(n, Equals, firstRequestNumber+i+1)
}
wal.Close()
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(len(requests), Equals, 20000)
wal, err := NewWAL(wal.config)
c.Assert(err, IsNil)
requests = []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(len(requests), Equals, 20000)
}
func (_ *WalSuite) TestRequestNumberRollOverAndIndexing(c *C) {
wal := newWal(c)
firstRequestNumber := uint32(math.MaxUint32 - 5000)
wal.logFiles[0].state.LargestRequestNumber = firstRequestNumber
var i uint32
for i = 0; i < 20000; i++ {
req := generateRequest(2)
n, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(n, Equals, firstRequestNumber+i+1)
}
wal.Close()
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(len(requests), Equals, 15000)
}
func (_ *WalSuite) TestRecoveryFromCrash(c *C) {
wal := newWal(c)
req := generateRequest(2)
@ -376,10 +447,11 @@ func (_ *WalSuite) TestIndex(c *C) {
}
c.Assert(wal.logFiles[0].state.Index.Entries, HasLen, 3)
requestOffset := wal.logFiles[0].state.Index.requestOffset(2001)
state := wal.logFiles[len(wal.logFiles)-1].state
requestOffset := wal.logFiles[0].state.Index.requestOffset(state, 2001)
c.Assert(requestOffset > 0, Equals, true)
// request 2000 should be in the second block not the third block
c.Assert(requestOffset > wal.logFiles[0].state.Index.requestOffset(2000), Equals, true)
c.Assert(requestOffset > wal.logFiles[0].state.Index.requestOffset(state, 2000), Equals, true)
}
func (_ *WalSuite) TestSequenceNumberAssignment(c *C) {