Fix . Fix some edge cases with WAL recovery from crashes

pull/488/merge
John Shahid 2014-05-07 16:46:31 -04:00
parent 7bc4199328
commit 5d5c997f41
3 changed files with 129 additions and 21 deletions

View File

@ -3,6 +3,7 @@
### Bugfixes
- [Issue #511](https://github.com/influxdb/influxdb/issues/511). Don't automatically create the database when a db user is created
- [Issue #369](https://github.com/influxdb/influxdb/issues/369). Fix some edge cases with WAL recovery
## v0.6.1 [2014-05-06]

View File

@ -1,8 +1,6 @@
package wal
import (
"code.google.com/p/goprotobuf/proto"
logger "code.google.com/p/log4go"
"configuration"
"fmt"
"io"
@ -11,6 +9,9 @@ import (
"protocol"
"strconv"
"strings"
"code.google.com/p/goprotobuf/proto"
logger "code.google.com/p/log4go"
)
type log struct {
@ -43,7 +44,40 @@ func newLog(file *os.File, config *configuration.Configuration) (*log, error) {
cachedSuffix: suffix,
}
return l, nil
return l, l.check()
}
func (self *log) check() error {
file, err := self.dupLogFile()
if err != nil {
return err
}
info, err := file.Stat()
if err != nil {
return err
}
size := info.Size()
offset, err := file.Seek(0, os.SEEK_SET)
if err != nil {
return err
}
for {
n, hdr, err := self.getNextHeader(file)
if err != nil {
return err
}
if n == 0 || hdr.length == 0 {
return self.file.Truncate(offset)
}
if offset+int64(n)+int64(hdr.length) > size {
// file is incomplete, truncate
return self.file.Truncate(offset)
}
if err := self.skipRequest(file, hdr); err != nil {
return err
}
offset += int64(n) + int64(hdr.length)
}
}
func (self *log) offset() int64 {
@ -190,6 +224,7 @@ func (self *log) replayFromFileLocation(file *os.File,
stopChan chan struct{}) {
offset, err := file.Seek(0, os.SEEK_CUR)
logger.Info("replaying from file location %d", offset)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
@ -224,30 +259,17 @@ func (self *log) replayFromFileLocation(file *os.File,
bytes := make([]byte, hdr.length)
read, err := file.Read(bytes)
if err == io.EOF {
// file ends prematurely, truncate to the previous request
logger.Warn("%s ends prematurely", file.Name())
offset, err := file.Seek(int64(-numberOfBytes), os.SEEK_CUR)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
logger.Debug("truncating %s to %d", file.Name(), offset)
err = file.Truncate(offset)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
}
return
}
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
if uint32(read) != hdr.length {
sendOrStop(newErrorReplayRequest(fmt.Errorf("expected to read %d but got %d instead", hdr.length, read)), replayChan, stopChan)
// file ends prematurely, probably a request is being written
logger.Debug("%s ends prematurely. Truncating to %d", file.Name(), offset)
return
}
req := &protocol.Request{}
err = req.Decode(bytes)
if err != nil {

View File

@ -368,10 +368,61 @@ func (_ *WalSuite) TestRecoveryFromCrash(c *C) {
filePath := path.Join(wal.config.WalDir, "log.1")
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
c.Assert(err, IsNil)
defer file.Close()
hdr := &entryHeader{1, 1, 10}
hdr := &entryHeader{1, 1, 500}
_, err = hdr.Write(file)
c.Assert(err, IsNil)
// write an incomplete request, 200 bytes as opposed to 500 bytes in
// the hdr
_, err = file.Write(make([]byte, 200))
c.Assert(err, IsNil)
defer file.Close()
// the WAL should trucate to just the first request
wal, err = NewWAL(wal.config)
wal.SetServerId(1)
c.Assert(err, IsNil)
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(requests, HasLen, 1)
// make sure the file is truncated
info, err := file.Stat()
c.Assert(err, IsNil)
c.Assert(info.Size(), Equals, int64(69))
// make sure appending a new request will increase the size of the
// file by just that request
_, err = wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
c.Assert(err, IsNil)
info, err = file.Stat()
c.Assert(err, IsNil)
c.Assert(info.Size(), Equals, int64(69*2))
requests = []*protocol.Request{}
wal.RecoverServerFromRequestNumber(1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(requests, HasLen, 2)
}
func (_ *WalSuite) TestAnotherRecoveryFromCrash(c *C) {
wal := newWal(c)
req := generateRequest(2)
_, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(wal.Close(), IsNil)
filePath := path.Join(wal.config.WalDir, "log.1")
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
c.Assert(err, IsNil)
hdr := &entryHeader{0, 0, 0}
_, err = hdr.Write(file)
c.Assert(err, IsNil)
defer file.Close()
// the WAL should trucate to just the first request
wal, err = NewWAL(wal.config)
wal.SetServerId(1)
c.Assert(err, IsNil)
@ -397,6 +448,40 @@ func (_ *WalSuite) TestRecoverWithNonWriteRequests(c *C) {
wal.SetServerId(1)
}
func (_ *WalSuite) TestAnotherSimultaneousReplay(c *C) {
wal := newWal(c)
signalChan := make(chan struct{})
go func() {
for i := 0; i < 1000; i++ {
request := generateRequest(1000)
wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
}
signalChan <- struct{}{}
}()
outer:
for {
select {
case <-signalChan:
break outer
default:
wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
return nil
})
}
}
c.Assert(wal.Close(), IsNil)
wal, err := NewWAL(wal.config)
c.Assert(err, IsNil)
wal.SetServerId(1)
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
c.Assert(len(requests), Equals, 1000)
}
func (_ *WalSuite) TestSimultaneousReplay(c *C) {
wal := newWal(c)
signalChan := make(chan struct{})