influxdb/wal/log.go

316 lines
7.5 KiB
Go

package wal
import (
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
"code.google.com/p/goprotobuf/proto"
logger "code.google.com/p/log4go"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/protocol"
)
type log struct {
closed bool
fileSize uint64
file *os.File
requestsSinceLastFlush int
config *configuration.Configuration
cachedSuffix uint32
}
func newLog(file *os.File, config *configuration.Configuration) (*log, error) {
info, err := file.Stat()
if err != nil {
return nil, err
}
size := uint64(info.Size())
suffixString := strings.TrimLeft(path.Base(file.Name()), "log.")
suffix, err := strconv.ParseUint(suffixString, 10, 32)
if err != nil {
return nil, err
}
l := &log{
file: file,
fileSize: size,
closed: false,
config: config,
cachedSuffix: uint32(suffix),
}
return l, l.check()
}
func (self *log) check() error {
file, err := self.dupLogFile()
if err != nil {
return err
}
info, err := file.Stat()
if err != nil {
return err
}
size := info.Size()
offset, err := file.Seek(0, os.SEEK_SET)
if err != nil {
return err
}
for {
n, hdr, err := self.getNextHeader(file)
if err != nil {
return err
}
if n == 0 || hdr.length == 0 {
logger.Warn("%s was truncated to %d since the file has a zero size request", self.file.Name(), offset)
return self.file.Truncate(offset)
}
if offset+int64(n)+int64(hdr.length) > size {
// file is incomplete, truncate
logger.Warn("%s was truncated to %d since the file ends prematurely", self.file.Name(), offset)
return self.file.Truncate(offset)
}
bytes := make([]byte, hdr.length)
_, err = file.Read(bytes)
if err != nil {
return err
}
// this request is invalid truncate file
req := &protocol.Request{}
err = req.Decode(bytes)
if err != nil {
logger.Warn("%s was truncated to %d since the end of the file contains invalid data", self.file.Name(), offset)
// truncate file and return
return self.file.Truncate(offset)
}
offset += int64(n) + int64(hdr.length)
}
}
func (self *log) offset() int64 {
offset, _ := self.file.Seek(0, os.SEEK_CUR)
return offset
}
func (self *log) suffix() uint32 {
return self.cachedSuffix
}
// this is for testing only
func (self *log) syncFile() error {
return self.file.Sync()
}
func (self *log) close() error {
logger.Debug("Closing %s", self.file.Name())
return self.file.Close()
}
func (self *log) delete() error {
logger.Debug("Deleting %s", self.file.Name())
return os.Remove(self.file.Name())
}
func (self *log) appendRequest(request *protocol.Request, shardId uint32) error {
bytes, err := request.Encode()
if err != nil {
return err
}
// every request is preceded with the length, shard id and the request number
hdr := &entryHeader{
shardId: shardId,
requestNumber: request.GetRequestNumber(),
length: uint32(len(bytes)),
}
writtenHdrBytes, err := hdr.Write(self.file)
if err != nil {
logger.Error("Error while writing header: %s", err)
return err
}
written, err := self.file.Write(bytes)
if err != nil {
logger.Error("Error while writing request: %s", err)
return err
}
if written < len(bytes) {
err = fmt.Errorf("Couldn't write entire request")
logger.Error("Error while writing request: %s", err)
return err
}
self.fileSize += uint64(writtenHdrBytes + written)
return nil
}
func (self *log) dupLogFile() (*os.File, error) {
return os.OpenFile(self.file.Name(), os.O_RDWR, 0)
}
// replay requests starting at the given requestNumber and for the
// given shard ids. Return all requests if shardIds is empty
func (self *log) dupAndReplayFromOffset(shardIds []uint32, offset int64, rn uint32) (chan *replayRequest, chan struct{}) {
// this channel needs to be buffered in case the last request in the
// log file caused an error in the yield function
stopChan := make(chan struct{}, 1)
replayChan := make(chan *replayRequest, 10)
go func() {
file, err := self.dupLogFile()
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
close(replayChan)
return
}
defer file.Close()
if err = self.skip(file, offset, rn); err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
close(replayChan)
return
}
shardIdsSet := map[uint32]struct{}{}
for _, shardId := range shardIds {
shardIdsSet[shardId] = struct{}{}
}
self.replayFromFileLocation(file, shardIdsSet, replayChan, stopChan)
}()
return replayChan, stopChan
}
func (self *log) getNextHeader(file *os.File) (int, *entryHeader, error) {
hdr := &entryHeader{}
numberOfBytes, err := hdr.Read(file)
if err == io.EOF {
return 0, nil, nil
}
return numberOfBytes, hdr, err
}
func (self *log) skip(file *os.File, offset int64, rn uint32) error {
if offset == -1 {
_, err := file.Seek(0, os.SEEK_SET)
return err
}
logger.Debug("Replaying from file offset %d", offset)
_, err := file.Seek(int64(offset), os.SEEK_SET)
if err != nil {
return err
}
return self.skipToRequest(file, rn)
}
func (self *log) skipRequest(file *os.File, hdr *entryHeader) (err error) {
_, err = file.Seek(int64(hdr.length), os.SEEK_CUR)
return
}
func (self *log) skipToRequest(file *os.File, requestNumber uint32) error {
for {
n, hdr, err := self.getNextHeader(file)
if n == 0 {
// EOF
return nil
}
if err != nil {
return err
}
if hdr.requestNumber < requestNumber {
if err := self.skipRequest(file, hdr); err != nil {
return err
}
continue
}
// seek back to the beginning of the request header
_, err = file.Seek(int64(-n), os.SEEK_CUR)
return err
}
}
func (self *log) replayFromFileLocation(file *os.File,
shardIdsSet map[uint32]struct{},
replayChan chan *replayRequest,
stopChan chan struct{}) {
offset, err := file.Seek(0, os.SEEK_CUR)
logger.Info("replaying from file location %d", offset)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
defer func() { close(replayChan) }()
for {
numberOfBytes, hdr, err := self.getNextHeader(file)
if numberOfBytes == 0 {
break
}
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
ok := false
if len(shardIdsSet) == 0 {
ok = true
} else {
_, ok = shardIdsSet[hdr.shardId]
}
if !ok {
err = self.skipRequest(file, hdr)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
continue
}
bytes := make([]byte, hdr.length)
read, err := file.Read(bytes)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
if uint32(read) != hdr.length {
// file ends prematurely, probably a request is being written
logger.Debug("%s ends prematurely. Truncating to %d", file.Name(), offset)
return
}
req := &protocol.Request{}
err = req.Decode(bytes)
if err != nil {
sendOrStop(newErrorReplayRequest(err), replayChan, stopChan)
return
}
req.RequestNumber = proto.Uint32(hdr.requestNumber)
replayRequest := &replayRequest{hdr.requestNumber, req, hdr.shardId, offset, offset + int64(numberOfBytes) + int64(hdr.length), nil}
if sendOrStop(replayRequest, replayChan, stopChan) {
return
}
offset = replayRequest.endOffset
}
}
func sendOrStop(req *replayRequest, replayChan chan *replayRequest, stopChan chan struct{}) bool {
if req.err != nil {
logger.Error("Error in replay: %s", req.err)
}
select {
case replayChan <- req:
case _, ok := <-stopChan:
logger.Debug("Stopping replay")
return ok
}
return false
}