diff --git a/src/wal/index.go b/src/wal/index.go index e2815eb08e..d35804a1cf 100644 --- a/src/wal/index.go +++ b/src/wal/index.go @@ -1,6 +1,7 @@ package wal import ( + logger "code.google.com/p/log4go" "sort" ) @@ -42,6 +43,7 @@ func (self *index) findOffsetBlock(requestNumber uint32) func(int) bool { func (self *index) requestOffset(requestNumber uint32) uint64 { numberOfEntries := len(self.Entries) if numberOfEntries == 0 { + logger.Info("no index entries, assuming beginning of the file") return 0 } index := sort.Search(numberOfEntries, self.findOffsetBlock(requestNumber)) @@ -55,7 +57,7 @@ func (self *index) requestOffset(requestNumber uint32) uint64 { } lastEntry := self.Entries[numberOfEntries-1] - if requestNumber > lastEntry.StartRequestNumber { + if requestNumber >= lastEntry.StartRequestNumber { return lastEntry.StartOffset } diff --git a/src/wal/log.go b/src/wal/log.go index 0acb2975f7..62d39d6eb1 100644 --- a/src/wal/log.go +++ b/src/wal/log.go @@ -61,8 +61,8 @@ func (self *log) internalIndex() error { return nil } - startRequestNumber := self.state.CurrentRequestNumber - uint32(self.state.RequestsSinceLastIndex) - logger.Info("Creating new index entry [%d,%d]", startRequestNumber, self.state.RequestsSinceLastBookmark) + startRequestNumber := self.state.CurrentRequestNumber - uint32(self.state.RequestsSinceLastIndex) + 1 + logger.Info("Creating new index entry [%d,%d]", startRequestNumber, self.state.RequestsSinceLastIndex) self.state.Index.addEntry(startRequestNumber, self.state.RequestsSinceLastIndex, self.fileSize) self.state.RequestsSinceLastIndex = 0 return nil @@ -111,6 +111,10 @@ func (self *log) recover() error { return err } + self.state.RequestsSinceLastBookmark = 0 + self.state.RequestsSinceLastIndex = 0 + self.state.setFileOffset(self.state.FileOffset) + // replay the rest of the wal if _, err := self.file.Seek(self.state.FileOffset, os.SEEK_SET); err != nil { return err @@ -134,6 +138,7 @@ func (self *log) recover() error { } self.state.recover(x) + self.conditionalBookmarkAndIndex() } info, err := self.file.Stat() @@ -183,6 +188,26 @@ func (self *log) processEntries() { } } +func (self *log) conditionalBookmarkAndIndex() { + shouldFlush := false + self.state.RequestsSinceLastIndex++ + if self.state.RequestsSinceLastIndex >= uint32(self.indexBlockSize) { + shouldFlush = true + self.internalIndex() + } + + self.state.RequestsSinceLastBookmark++ + if self.state.RequestsSinceLastBookmark >= self.bookmarkSize { + shouldFlush = true + self.internalBookmark() + } + + self.requestsSinceLastFlush++ + if self.requestsSinceLastFlush > self.flushSize || shouldFlush { + self.internalFlush() + } +} + func (self *log) internalAppendRequest(x *entry) { self.assignSequenceNumbers(x.shardId, x.request) bytes, err := x.request.Encode() @@ -192,7 +217,6 @@ func (self *log) internalAppendRequest(x *entry) { var requestNumber uint32 var written, writtenHdrBytes int var hdr *entryHeader - shouldFlush := false if err != nil { goto returnError @@ -206,35 +230,21 @@ func (self *log) internalAppendRequest(x *entry) { } writtenHdrBytes, err = hdr.Write(self.file) if err != nil { + logger.Error("Error while writing header: %s", err) goto returnError } written, err = self.file.Write(bytes) if err != nil { + logger.Error("Error while writing request: %s", err) goto returnError } if written < len(bytes) { err = fmt.Errorf("Couldn't write entire request") + logger.Error("Error while writing request: %s", err) goto returnError } self.fileSize += uint64(writtenHdrBytes + written) - - self.state.RequestsSinceLastBookmark++ - if self.state.RequestsSinceLastBookmark >= self.bookmarkSize { - shouldFlush = true - self.internalBookmark() - } - - self.state.RequestsSinceLastIndex++ - if self.state.RequestsSinceLastIndex >= uint32(self.indexBlockSize) { - shouldFlush = true - self.internalIndex() - } - - self.requestsSinceLastFlush++ - if self.requestsSinceLastFlush > self.flushSize || shouldFlush { - self.internalFlush() - } - + self.conditionalBookmarkAndIndex() x.confirmation <- &confirmation{requestNumber, nil} return returnError: diff --git a/src/wal/wal.go b/src/wal/wal.go index b3f44b5787..5b86c92317 100644 --- a/src/wal/wal.go +++ b/src/wal/wal.go @@ -1,6 +1,7 @@ package wal import ( + logger "code.google.com/p/log4go" "configuration" "fmt" "os" @@ -29,7 +30,9 @@ func NewWAL(config *configuration.Configuration) (*WAL, error) { return nil, err } - logFile, err := os.OpenFile(path.Join(config.WalDir, "log"), os.O_CREATE|os.O_RDWR, 0644) + logFilePath := path.Join(config.WalDir, "log") + logger.Info("Opening log file %s", logFilePath) + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_RDWR, 0644) if err != nil { return nil, err } diff --git a/src/wal/wal_test.go b/src/wal/wal_test.go index 476af66e3b..1c886af288 100644 --- a/src/wal/wal_test.go +++ b/src/wal/wal_test.go @@ -127,13 +127,16 @@ func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) { id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) c.Assert(err, IsNil) c.Assert(id, Equals, uint32(1)) + // close andn reopen the wal wal.Close() - wal, err = NewWAL(wal.config) - c.Assert(err, IsNil) - request = generateRequest(2) - id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) - c.Assert(err, IsNil) - c.Assert(id, Equals, uint32(2)) + for i := 0; i < 2; i++ { + wal, err = NewWAL(wal.config) + c.Assert(err, IsNil) + request = generateRequest(2) + id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(id, Equals, uint32(i+2)) + } // make sure the bookmark exist bookmarkPath := path.Join(wal.config.WalDir, "bookmark") f, err := os.Open(bookmarkPath) @@ -141,8 +144,8 @@ func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) { s := &state{} err = s.read(f) c.Assert(err, IsNil) - c.Assert(s.ShardLastSequenceNumber[1], Equals, uint64(4)) - c.Assert(s.CurrentRequestNumber, Equals, uint32(2)) + c.Assert(s.ShardLastSequenceNumber[1], Equals, uint64(6)) + c.Assert(s.CurrentRequestNumber, Equals, uint32(3)) } func (_ *WalSuite) TestAutoBookmarkShouldntHappenTooOften(c *C) { @@ -186,9 +189,35 @@ func (_ *WalSuite) TestReplay(c *C) { c.Assert(err, IsNil) } +func (_ *WalSuite) TestIndexAfterRecovery(c *C) { + wal := newWal(c) + for i := 0; i < 1500; i++ { + request := generateRequest(2) + id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(id, Equals, uint32(i+1)) + } + + wal.log.closeWithoutBookmark() + + wal, err := NewWAL(wal.config) + c.Assert(err, IsNil) + + for i := 0; i < 500; i++ { + request := generateRequest(2) + id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) + c.Assert(err, IsNil) + c.Assert(id, Equals, uint32(1500+i+1)) + } + c.Assert(wal.log.state.Index.Entries, HasLen, 2) + c.Assert(wal.log.state.Index.Entries[0].Size, Equals, uint32(1000)) + c.Assert(wal.log.state.Index.Entries[0].StartRequestNumber, Equals, uint32(1)) + c.Assert(wal.log.state.Index.Entries[1].Size, Equals, uint32(1000)) + c.Assert(wal.log.state.Index.Entries[1].StartRequestNumber, Equals, uint32(1001)) +} + func (_ *WalSuite) TestIndex(c *C) { wal := newWal(c) - wal.log.indexBlockSize = 1000 for i := 0; i < 3000; i++ { request := generateRequest(1) id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})