fix #284. WAL should recover from files ending prematurely
parent
9513a6165e
commit
8f93eb9336
|
@ -0,0 +1,39 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
// An error that remembers where it was thrown from
|
||||
type ErrorWithStacktrace struct {
|
||||
msg string
|
||||
stacktrace string
|
||||
cause error
|
||||
}
|
||||
|
||||
// if msg is provided, msg is treated as the error message and the
|
||||
// error is treated as the original cause of this error. Otherwise the
|
||||
// error is used as the error message. This is useful for chaining
|
||||
// multiple errors to trace what was the original error that triggered
|
||||
// the subsequent errors
|
||||
func NewErrorWithStacktrace(cause error, msg ...interface{}) *ErrorWithStacktrace {
|
||||
buffer := make([]byte, 1024, 1042)
|
||||
n := runtime.Stack(buffer, false)
|
||||
message := ""
|
||||
if len(msg) > 0 {
|
||||
msgString := msg[0].(string)
|
||||
message = fmt.Sprintf(msgString, msg[1:])
|
||||
}
|
||||
return &ErrorWithStacktrace{message, string(buffer[:n]), cause}
|
||||
}
|
||||
|
||||
func (self *ErrorWithStacktrace) Error() string {
|
||||
if self.msg == "" {
|
||||
return fmt.Sprintf("%s. Stacktrace:\n%s\n", self.cause, self.stacktrace)
|
||||
}
|
||||
if self.cause == nil {
|
||||
return fmt.Sprintf("%s. Stacktrace:\n%s\n", self.msg, self.stacktrace)
|
||||
}
|
||||
return fmt.Sprintf("%s. Stacktrace:\n%s\nCaused by: %s", self.msg, self.stacktrace, self.cause)
|
||||
}
|
|
@ -270,7 +270,7 @@ func (self *log) replayFromFileLocation(file *os.File,
|
|||
defer func() { close(replayChan) }()
|
||||
for {
|
||||
hdr := &entryHeader{}
|
||||
_, err := hdr.Read(file)
|
||||
numberOfBytes, err := hdr.Read(file)
|
||||
|
||||
if err == io.EOF {
|
||||
return
|
||||
|
@ -299,6 +299,20 @@ func (self *log) replayFromFileLocation(file *os.File,
|
|||
|
||||
bytes := make([]byte, hdr.length)
|
||||
read, err := file.Read(bytes)
|
||||
if err == io.EOF {
|
||||
// file ends prematurely, truncate to the previous request
|
||||
logger.Warn("%s ends prematurely, truncating to last known good request", file.Name())
|
||||
offset, err := file.Seek(int64(-numberOfBytes), os.SEEK_CUR)
|
||||
if err != nil {
|
||||
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
|
||||
return
|
||||
}
|
||||
err = file.Truncate(offset)
|
||||
if err != nil {
|
||||
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
|
||||
return
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package wal
|
||||
|
||||
import (
|
||||
"common"
|
||||
"protocol"
|
||||
)
|
||||
|
||||
|
@ -13,6 +14,6 @@ type replayRequest struct {
|
|||
|
||||
func newErrorReplayRequest(err error) *replayRequest {
|
||||
return &replayRequest{
|
||||
err: err,
|
||||
err: common.NewErrorWithStacktrace(err, "Replay error"),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -253,6 +253,29 @@ func (_ *WalSuite) TestReplay(c *C) {
|
|||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (_ *WalSuite) TestRecoveryFromCrash(c *C) {
|
||||
wal := newWal(c)
|
||||
req := generateRequest(2)
|
||||
_, err := wal.AssignSequenceNumbersAndLog(req, &MockShard{id: 1})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(wal.Close(), IsNil)
|
||||
filePath := path.Join(wal.config.WalDir, "log.1")
|
||||
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
|
||||
c.Assert(err, IsNil)
|
||||
defer file.Close()
|
||||
hdr := &entryHeader{1, 1, 10}
|
||||
_, err = hdr.Write(file)
|
||||
c.Assert(err, IsNil)
|
||||
wal, err = NewWAL(wal.config)
|
||||
c.Assert(err, IsNil)
|
||||
requests := []*protocol.Request{}
|
||||
wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
|
||||
requests = append(requests, req)
|
||||
return nil
|
||||
})
|
||||
c.Assert(requests, HasLen, 1)
|
||||
}
|
||||
|
||||
func (_ *WalSuite) TestSimultaneousReplay(c *C) {
|
||||
wal := newWal(c)
|
||||
signalChan := make(chan struct{})
|
||||
|
|
Loading…
Reference in New Issue