Merge pull request #3717 from benbjohnson/pd-wal

add WAL recovery
pull/3569/head
Paul Dix 2015-08-18 17:12:55 -04:00
commit e7f7be4536
2 changed files with 227 additions and 62 deletions

View File

@ -54,6 +54,9 @@ const (
// MetaFileExtension is the file extension for the log files of new fields and measurements that get created
MetaFileExtension = "meta"
// CompactionExtension is the file extension we expect for compaction files
CompactionExtension = "CPT"
// MetaFlushInterval is the period after which any compressed meta data in the .meta file will get
// flushed to the index
MetaFlushInterval = 10 * time.Minute
@ -86,9 +89,6 @@ var (
// ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one
ErrCompactionRunning = errors.New("compaction running")
// ErrCompactionBlock gets returned if we're reading a compressed block and its a file compaction description
ErrCompactionBlock = errors.New("compaction description")
// ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory
// but a compaction has already been done to do so
ErrMemoryCompactionDone = errors.New("compaction already run to free up memory")
@ -339,7 +339,7 @@ func (l *Log) DeleteSeries(keys []string) error {
// seriesAndFields objects that were written in. It ignores file errors since those can't be
// recovered.
func (l *Log) readMetadataFile(fileName string) ([]*seriesAndFields, error) {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0666)
f, err := os.OpenFile(fileName, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
@ -492,6 +492,12 @@ func (l *Log) openPartitionFiles() error {
for _, p := range l.partitions {
go func(p *Partition) {
// Recover from a partial compaction.
if err := p.recoverCompactionFile(); err != nil {
results <- fmt.Errorf("recover compaction files: %s", err)
return
}
fileNames, err := p.segmentFileNames()
if err != nil {
results <- err
@ -651,12 +657,12 @@ func (l *Log) partition(key []byte) *Partition {
id := uint8(h.Sum64()%l.partitionCount + 1)
p := l.partitions[id]
if p == nil {
if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil {
p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index)
if err != nil {
panic(err)
} else {
p.enableLogging = l.EnableLogging
l.partitions[id] = p
}
p.enableLogging = l.EnableLogging
l.partitions[id] = p
}
return p
}
@ -699,10 +705,17 @@ type Partition struct {
lastWriteTime time.Time
enableLogging bool
// Used for mocking OS calls
os struct {
OpenCompactionFile func(name string, flag int, perm os.FileMode) (file *os.File, err error)
OpenSegmentFile func(name string, flag int, perm os.FileMode) (file *os.File, err error)
Rename func(oldpath, newpath string) error
}
}
func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) {
return &Partition{
p := &Partition{
id: id,
path: path,
maxSegmentSize: segmentSize,
@ -712,7 +725,13 @@ func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64
readySeriesSize: readySeriesSize,
index: index,
flushColdInterval: flushColdInterval,
}, nil
}
p.os.OpenCompactionFile = os.OpenFile
p.os.OpenSegmentFile = os.OpenFile
p.os.Rename = os.Rename
return p, nil
}
// Close resets the caches and closes the currently open segment file
@ -798,7 +817,6 @@ func (p *Partition) newSegmentFile() error {
}
fileName := p.fileNameForSegment(p.currentSegmentID)
ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
@ -816,7 +834,7 @@ func (p *Partition) fileNameForSegment(id uint32) string {
// compactionFileName is the name of the temporary file used for compaction
func (p *Partition) compactionFileName() string {
return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.CPT", p.id, 1))
return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, 1, CompactionExtension))
}
// fileIDFromName will return the segment ID from the file name
@ -1012,7 +1030,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
}
// all compacted data from the segments will go into this file
compactionFile, err := os.OpenFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666)
compactionFile, err := p.os.OpenCompactionFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
@ -1028,7 +1046,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
break
}
f, err := os.OpenFile(n, os.O_RDONLY, 0666)
f, err := p.os.OpenSegmentFile(n, os.O_RDONLY, 0666)
if err != nil {
return err
}
@ -1036,15 +1054,12 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
sf := newSegment(f)
var entries []*entry
for {
a, err := sf.readCompressedBlock()
if err == ErrCompactionBlock {
continue
name, a, err := sf.readCompressedBlock()
if name != "" {
continue // skip name blocks
} else if err != nil {
return err
}
if a == nil {
} else if a == nil {
break
}
@ -1056,7 +1071,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
}
}
if err := p.writeCompactionEntry(compactionFile, entries); err != nil {
if err := p.writeCompactionEntry(compactionFile, f.Name(), entries); err != nil {
return err
}
@ -1080,13 +1095,13 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
return os.Remove(compactionFile.Name())
}
return os.Rename(compactionFile.Name(), p.fileNameForSegment(1))
return p.os.Rename(compactionFile.Name(), p.fileNameForSegment(1))
}
// writeCompactionEntry will write a marker for the beginning of the file we're compacting, a compressed block
// for all entries, then a marker for the end of the file
func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error {
if err := p.writeCompactionFileName(f); err != nil {
func (p *Partition) writeCompactionEntry(f *os.File, filename string, entries []*entry) error {
if err := p.writeCompactionFileName(f, filename); err != nil {
return err
}
@ -1094,8 +1109,8 @@ func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error {
for _, e := range entries {
block = append(block, marshalWALEntry(e.key, e.timestamp, e.data)...)
}
b := snappy.Encode(nil, block)
b := snappy.Encode(nil, block)
if _, err := f.Write(u64tob(uint64(len(b)))); err != nil {
return err
}
@ -1104,17 +1119,12 @@ func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error {
return err
}
if err := p.writeCompactionFileName(f); err != nil {
return err
}
return f.Sync()
}
// writeCompactionFileName will write a compaction log length entry and the name of the file that is compacted
func (p *Partition) writeCompactionFileName(f *os.File) error {
name := []byte(f.Name())
length := u64tob(uint64(len(name)))
func (p *Partition) writeCompactionFileName(f *os.File, filename string) error {
length := u64tob(uint64(len([]byte(filename))))
// the beginning of the length has two bytes to indicate that this is a compaction log entry
length[0] = 0xFF
@ -1124,13 +1134,86 @@ func (p *Partition) writeCompactionFileName(f *os.File) error {
return err
}
if _, err := f.Write(name); err != nil {
if _, err := f.Write([]byte(filename)); err != nil {
return err
}
return nil
}
// recoverCompactionFile iterates over all compaction files in a directory and
// cleans them and removes undeleted files.
func (p *Partition) recoverCompactionFile() error {
path := p.compactionFileName()
// Open compaction file. Ignore if it doesn't exist.
f, err := p.os.OpenCompactionFile(path, os.O_RDWR, 0666)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer f.Close()
// Iterate through all named blocks.
sf := newSegment(f)
var hasData bool
for {
// Only read named blocks.
name, a, err := sf.readCompressedBlock()
if err != nil {
return fmt.Errorf("read name block: %s", err)
} else if name == "" && a == nil {
break // eof
} else if name == "" {
continue // skip unnamed blocks
}
// Read data for the named block.
if s, entries, err := sf.readCompressedBlock(); err != nil {
return fmt.Errorf("read data block: %s", err)
} else if s != "" {
return fmt.Errorf("unexpected double name block")
} else if entries == nil {
break // eof
}
// If data exists then ensure the underlying segment is deleted.
if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove segment: filename=%s, err=%s", name, err)
}
// Flag the compaction file as having data and it should be renamed.
hasData = true
}
f.Close()
// If the compaction file did not have at least one named block written to
// it then it should removed. This check is performed to ensure a partial
// compaction file does not overwrite an original segment file.
if !hasData {
if err := os.Remove(path); err != nil {
return fmt.Errorf("remove compaction file: %s", err)
}
return nil
}
// Double check that we are not renaming the compaction file over an
// existing segment file. The segment file should be removed in the
// recovery process but this simply double checks that removal occurred.
newpath := p.fileNameForSegment(1)
if _, err := os.Stat(newpath); !os.IsNotExist(err) {
return fmt.Errorf("cannot rename compaction file, segment exists: filename=%s", newpath)
}
// Rename compaction file to the first segment file.
if err := p.os.Rename(path, newpath); err != nil {
return fmt.Errorf("rename compaction file: %s", err)
}
return nil
}
// readFile will read a segment file and marshal its entries into the cache
func (p *Partition) readFile(path string) (entries []*entry, err error) {
id, err := p.fileIDFromName(path)
@ -1145,10 +1228,9 @@ func (p *Partition) readFile(path string) (entries []*entry, err error) {
sf := newSegment(f)
for {
a, err := sf.readCompressedBlock()
if err == ErrCompactionBlock {
continue
name, a, err := sf.readCompressedBlock()
if name != "" {
continue // skip name blocks
} else if err != nil {
f.Close()
return nil, err
@ -1300,31 +1382,27 @@ func newSegment(f *os.File) *segment {
// readCompressedBlock will read the next compressed block from the file and marshal the entries.
// if we've hit the end of the file or corruption the entry array will be nil
func (s *segment) readCompressedBlock() (entries []*entry, err error) {
func (s *segment) readCompressedBlock() (name string, entries []*entry, err error) {
blockSize := int64(0)
n, err := s.f.Read(s.length)
if err == io.EOF {
return nil, nil
return "", nil, nil
} else if err != nil {
return nil, err
return "", nil, fmt.Errorf("read length: %s", err)
} else if n != len(s.length) {
// seek back before this length so we can start overwriting the file from here
log.Println("unable to read the size of a data block from file: ", s.f.Name())
s.f.Seek(-int64(n), 1)
return "", nil, nil
}
blockSize += int64(n)
if n != len(s.length) {
log.Println("unable to read the size of a data block from file: ", s.f.Name())
// seek back before this length so we can start overwriting the file from here
s.f.Seek(-int64(n), 1)
return nil, nil
}
// Compacted WAL files will have a magic byte sequence that indicate the next part is a file name
// instead of a compressed block. We can ignore these bytes and the ensuing file name to get to the next block.
isCompactionFileNameBlock := false
if bytes.Compare(s.length[0:2], CompactSequence) == 0 {
s.length[0] = 0x00
s.length[1] = 0x00
isCompactionFileNameBlock = true
isCompactionFileNameBlock := bytes.Equal(s.length[0:2], CompactSequence)
if isCompactionFileNameBlock {
s.length[0], s.length[1] = 0x00, 0x00
}
dataLength := btou64(s.length)
@ -1332,7 +1410,7 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) {
// make sure we haven't hit the end of data. trailing end of file can be zero bytes
if dataLength == 0 {
s.f.Seek(-int64(len(s.length)), 1)
return nil, nil
return "", nil, nil
}
if len(s.block) < int(dataLength) {
@ -1341,7 +1419,7 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) {
n, err = s.f.Read(s.block[:dataLength])
if err != nil {
return nil, err
return "", nil, fmt.Errorf("read block: %s", err)
}
blockSize += int64(n)
@ -1353,34 +1431,34 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) {
// seek back to before this block and its size so we can overwrite the corrupt data
s.f.Seek(-int64(len(s.length)+n), 1)
if err := s.f.Truncate(s.size); err != nil {
return nil, err
return "", nil, fmt.Errorf("truncate(0): sz=%d, err=%s", s.size, err)
}
return nil, nil
return "", nil, nil
}
// skip the rest if this is just the filename from a compaction
if isCompactionFileNameBlock {
return nil, ErrCompactionBlock
return string(s.block[:dataLength]), nil, nil
}
buf, err := snappy.Decode(nil, s.block[:dataLength])
// if there was an error decoding, this is a corrupt block so we zero out the rest of the file
buf, err := snappy.Decode(nil, s.block[:dataLength])
if err != nil {
log.Println("corrupt compressed block in file: ", err.Error(), s.f.Name())
// go back to the start of this block and zero out the rest of the file
s.f.Seek(-int64(len(s.length)+n), 1)
if err := s.f.Truncate(s.size); err != nil {
return nil, err
return "", nil, fmt.Errorf("truncate(1): sz=%d, err=%s", s.size, err)
}
return nil, nil
return "", nil, nil
}
// read in the individual data points from the decompressed wal block
bytesRead := 0
entries = make([]*entry, 0)
for {
if bytesRead >= len(buf) {
break

View File

@ -3,10 +3,12 @@ package wal
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"
@ -685,6 +687,91 @@ func TestWAL_DeleteSeries(t *testing.T) {
}
}
// Ensure a partial compaction can be recovered from.
func TestWAL_Compact_Recovery(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
log.CompactionThreshold = 0.7
log.ReadySeriesSize = 1024
log.flushCheckInterval = time.Minute
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
// Retrieve partition.
p := log.partitions[1]
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
b := make([]byte, 70*5000)
for i := 1; i <= 100; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= 1000; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", rand.Int(), rand.Float64(), i))
// Write the batch out.
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
}
// Mock second open call to fail.
p.os.OpenSegmentFile = func(name string, flag int, perm os.FileMode) (file *os.File, err error) {
if filepath.Base(name) == "01.000001.wal" {
return os.OpenFile(name, flag, perm)
}
return nil, errors.New("marker")
}
if err := p.flushAndCompact(thresholdFlush); err == nil || err.Error() != "marker" {
t.Fatalf("unexpected flush error: %s", err)
}
p.os.OpenSegmentFile = os.OpenFile
// Append second file to simulate partial write.
func() {
f, err := os.OpenFile(p.compactionFileName(), os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Append filename and partial data.
if err := p.writeCompactionEntry(f, "01.000002.wal", []*entry{{key: []byte("foo"), data: []byte("bar"), timestamp: 100}}); err != nil {
t.Fatal(err)
}
// Truncate by a few bytes.
if fi, err := f.Stat(); err != nil {
t.Fatal(err)
} else if err = f.Truncate(fi.Size() - 2); err != nil {
t.Fatal(err)
}
}()
// Now close and re-open the wal and ensure there are no errors.
log.Close()
if err := log.Open(); err != nil {
t.Fatalf("unexpected open error: %s", err)
}
}
// test that partitions get compacted and flushed when number of series hits compaction threshold
// test that partitions get compacted and flushed when a single series hits the compaction threshold
// test that writes slow down when the partition size threshold is hit