some bug fixing

pull/269/head
John Shahid 2014-02-14 12:25:57 -05:00
parent 4e62287ac9
commit 6ec59953fd
4 changed files with 76 additions and 32 deletions

View File

@ -1,6 +1,7 @@
package wal
import (
logger "code.google.com/p/log4go"
"sort"
)
@ -42,6 +43,7 @@ func (self *index) findOffsetBlock(requestNumber uint32) func(int) bool {
func (self *index) requestOffset(requestNumber uint32) uint64 {
numberOfEntries := len(self.Entries)
if numberOfEntries == 0 {
logger.Info("no index entries, assuming beginning of the file")
return 0
}
index := sort.Search(numberOfEntries, self.findOffsetBlock(requestNumber))
@ -55,7 +57,7 @@ func (self *index) requestOffset(requestNumber uint32) uint64 {
}
lastEntry := self.Entries[numberOfEntries-1]
if requestNumber > lastEntry.StartRequestNumber {
if requestNumber >= lastEntry.StartRequestNumber {
return lastEntry.StartOffset
}

View File

@ -61,8 +61,8 @@ func (self *log) internalIndex() error {
return nil
}
startRequestNumber := self.state.CurrentRequestNumber - uint32(self.state.RequestsSinceLastIndex)
logger.Info("Creating new index entry [%d,%d]", startRequestNumber, self.state.RequestsSinceLastBookmark)
startRequestNumber := self.state.CurrentRequestNumber - uint32(self.state.RequestsSinceLastIndex) + 1
logger.Info("Creating new index entry [%d,%d]", startRequestNumber, self.state.RequestsSinceLastIndex)
self.state.Index.addEntry(startRequestNumber, self.state.RequestsSinceLastIndex, self.fileSize)
self.state.RequestsSinceLastIndex = 0
return nil
@ -111,6 +111,10 @@ func (self *log) recover() error {
return err
}
self.state.RequestsSinceLastBookmark = 0
self.state.RequestsSinceLastIndex = 0
self.state.setFileOffset(self.state.FileOffset)
// replay the rest of the wal
if _, err := self.file.Seek(self.state.FileOffset, os.SEEK_SET); err != nil {
return err
@ -134,6 +138,7 @@ func (self *log) recover() error {
}
self.state.recover(x)
self.conditionalBookmarkAndIndex()
}
info, err := self.file.Stat()
@ -183,6 +188,26 @@ func (self *log) processEntries() {
}
}
func (self *log) conditionalBookmarkAndIndex() {
shouldFlush := false
self.state.RequestsSinceLastIndex++
if self.state.RequestsSinceLastIndex >= uint32(self.indexBlockSize) {
shouldFlush = true
self.internalIndex()
}
self.state.RequestsSinceLastBookmark++
if self.state.RequestsSinceLastBookmark >= self.bookmarkSize {
shouldFlush = true
self.internalBookmark()
}
self.requestsSinceLastFlush++
if self.requestsSinceLastFlush > self.flushSize || shouldFlush {
self.internalFlush()
}
}
func (self *log) internalAppendRequest(x *entry) {
self.assignSequenceNumbers(x.shardId, x.request)
bytes, err := x.request.Encode()
@ -192,7 +217,6 @@ func (self *log) internalAppendRequest(x *entry) {
var requestNumber uint32
var written, writtenHdrBytes int
var hdr *entryHeader
shouldFlush := false
if err != nil {
goto returnError
@ -206,35 +230,21 @@ func (self *log) internalAppendRequest(x *entry) {
}
writtenHdrBytes, err = hdr.Write(self.file)
if err != nil {
logger.Error("Error while writing header: %s", err)
goto returnError
}
written, err = self.file.Write(bytes)
if err != nil {
logger.Error("Error while writing request: %s", err)
goto returnError
}
if written < len(bytes) {
err = fmt.Errorf("Couldn't write entire request")
logger.Error("Error while writing request: %s", err)
goto returnError
}
self.fileSize += uint64(writtenHdrBytes + written)
self.state.RequestsSinceLastBookmark++
if self.state.RequestsSinceLastBookmark >= self.bookmarkSize {
shouldFlush = true
self.internalBookmark()
}
self.state.RequestsSinceLastIndex++
if self.state.RequestsSinceLastIndex >= uint32(self.indexBlockSize) {
shouldFlush = true
self.internalIndex()
}
self.requestsSinceLastFlush++
if self.requestsSinceLastFlush > self.flushSize || shouldFlush {
self.internalFlush()
}
self.conditionalBookmarkAndIndex()
x.confirmation <- &confirmation{requestNumber, nil}
return
returnError:

View File

@ -1,6 +1,7 @@
package wal
import (
logger "code.google.com/p/log4go"
"configuration"
"fmt"
"os"
@ -29,7 +30,9 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) {
return nil, err
}
logFile, err := os.OpenFile(path.Join(config.WalDir, "log"), os.O_CREATE|os.O_RDWR, 0644)
logFilePath := path.Join(config.WalDir, "log")
logger.Info("Opening log file %s", logFilePath)
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}

View File

@ -127,13 +127,16 @@ func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) {
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1))
// close andn reopen the wal
wal.Close()
wal, err = NewWAL(wal.config)
c.Assert(err, IsNil)
request = generateRequest(2)
id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(2))
for i := 0; i < 2; i++ {
wal, err = NewWAL(wal.config)
c.Assert(err, IsNil)
request = generateRequest(2)
id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(i+2))
}
// make sure the bookmark exist
bookmarkPath := path.Join(wal.config.WalDir, "bookmark")
f, err := os.Open(bookmarkPath)
@ -141,8 +144,8 @@ func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) {
s := &state{}
err = s.read(f)
c.Assert(err, IsNil)
c.Assert(s.ShardLastSequenceNumber[1], Equals, uint64(4))
c.Assert(s.CurrentRequestNumber, Equals, uint32(2))
c.Assert(s.ShardLastSequenceNumber[1], Equals, uint64(6))
c.Assert(s.CurrentRequestNumber, Equals, uint32(3))
}
func (_ *WalSuite) TestAutoBookmarkShouldntHappenTooOften(c *C) {
@ -186,9 +189,35 @@ func (_ *WalSuite) TestReplay(c *C) {
c.Assert(err, IsNil)
}
func (_ *WalSuite) TestIndexAfterRecovery(c *C) {
wal := newWal(c)
for i := 0; i < 1500; i++ {
request := generateRequest(2)
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(i+1))
}
wal.log.closeWithoutBookmark()
wal, err := NewWAL(wal.config)
c.Assert(err, IsNil)
for i := 0; i < 500; i++ {
request := generateRequest(2)
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1500+i+1))
}
c.Assert(wal.log.state.Index.Entries, HasLen, 2)
c.Assert(wal.log.state.Index.Entries[0].Size, Equals, uint32(1000))
c.Assert(wal.log.state.Index.Entries[0].StartRequestNumber, Equals, uint32(1))
c.Assert(wal.log.state.Index.Entries[1].Size, Equals, uint32(1000))
c.Assert(wal.log.state.Index.Entries[1].StartRequestNumber, Equals, uint32(1001))
}
func (_ *WalSuite) TestIndex(c *C) {
wal := newWal(c)
wal.log.indexBlockSize = 1000
for i := 0; i < 3000; i++ {
request := generateRequest(1)
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})