Add full durability to WAL and flush on startup

pull/4308/head
Paul Dix 2015-09-17 11:23:27 -04:00
parent 82e1be7527
commit 2100e66437
2 changed files with 333 additions and 21 deletions

View File

@ -2,6 +2,7 @@ package pd1
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
@ -28,6 +29,8 @@ const (
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second
writeBufLen = 32 << 10 // 32kb
)
// flushType indiciates why a flush and compaction are being run so the partition can
@ -45,8 +48,8 @@ const (
idleFlush
// deleteFlush indicates that we're flushing because series need to be removed from the WAL
deleteFlush
writeBufLen = 32 << 10 // 32kb
// startupFlush indicates that we're flushing because the database is starting up
startupFlush
)
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
@ -129,7 +132,7 @@ func NewLog(path string) *Log {
SegmentSize: DefaultSegmentSize,
MemorySizeThreshold: tsdb.DefaultPartitionSizeThreshold,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[pwl] ", log.LstdFlags),
logger: log.New(os.Stderr, "[pd1wal] ", log.LstdFlags),
}
}
@ -138,7 +141,7 @@ func (l *Log) Open() error {
if l.LoggingEnabled {
l.logger.Printf("PD1 WAL starting with %d memory size threshold\n", l.MemorySizeThreshold)
l.logger.Printf("WAL writing to %s\n", l.path)
l.logger.Printf("PD1 WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
@ -147,7 +150,11 @@ func (l *Log) Open() error {
l.cache = make(map[string]Values)
l.cacheDirtySort = make(map[string]bool)
l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields)
// TODO: read segment files and flush them all to disk
// flush out any WAL entries that are there from before
if err := l.readAndFlushWAL(); err != nil {
return err
}
l.flushCheckTimer = time.NewTimer(l.flushCheckInterval)
@ -194,6 +201,7 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
// make the write durable if specified
if !l.SkipDurability {
// write the points
pointStrings := make([]string, len(points))
for i, p := range points {
pointStrings[i] = p.String()
@ -205,13 +213,47 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
return err
}
// TODO: write the fields
// write the new fields
if len(fields) > 0 {
data, err := json.Marshal(fields)
if err != nil {
return err
}
compressed = snappy.Encode(compressed, data)
if err := l.writeToLog(fieldsEntry, compressed); err != nil {
return err
}
}
// TODO: write the series
// write the new series
if len(series) > 0 {
data, err := json.Marshal(series)
if err != nil {
return err
}
compressed = snappy.Encode(compressed, data)
if err := l.writeToLog(seriesEntry, compressed); err != nil {
return err
}
}
}
// convert to values that can be either cached in memory or flushed to the index
// add everything to the cache
l.addToCache(points, fields, series)
// usually skipping the cache is only for testing purposes and this was the easiest
// way to represent the logic (to cache and then immediately flush)
if l.SkipCache {
l.flush(idleFlush)
}
return nil
}
func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
for _, p := range points {
for name, value := range p.Fields() {
k := seriesFieldKey(string(p.Key()), name)
@ -235,25 +277,114 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
}
l.seriesToCreateCache = append(l.seriesToCreateCache, series...)
l.lastWriteTime = time.Now()
l.cacheLock.Unlock()
}
// usually skipping the cache is only for testing purposes and this was the easiest
// way to represent the logic (to cache and then immediately flush)
if l.SkipCache {
l.flush(idleFlush)
// readAndFlushWAL is called on open and will read the segment files in, flushing whenever
// the memory gets over the limit. Once all files have been read it will flush and remove the files
func (l *Log) readAndFlushWAL() error {
files, err := l.segmentFileNames()
if err != nil {
return err
}
// read all the segment files and cache them, flushing along the way if we
// hit memory limits
for _, fn := range files {
if err := l.readFileToCache(fn); err != nil {
return err
}
if l.memorySize > l.MemorySizeThreshold {
if err := l.flush(memoryFlush); err != nil {
return err
}
}
}
// now flush and remove all the old files
if err := l.flush(startupFlush); err != nil {
return err
}
return nil
}
func (l *Log) readFileToCache(fileName string) error {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0666)
if err != nil {
return err
}
defer f.Close()
buf := make([]byte, writeBufLen)
data := make([]byte, writeBufLen)
for {
// read the type and the length of the entry
_, err := io.ReadFull(f, buf[0:5])
if err == io.EOF {
return nil
} else if err != nil {
l.logger.Printf("error reading segment file %s: %s", fileName, err.Error())
return err
}
entryType := buf[0]
length := btou32(buf[1:5])
// read the compressed block and decompress it
if int(length) > len(buf) {
buf = make([]byte, length)
}
_, err = io.ReadFull(f, buf[0:length])
if err == io.EOF {
l.logger.Printf("hit end of file while reading compressed wal entry from %s", fileName)
return nil
} else if err != nil {
return err
}
data, err = snappy.Decode(data, buf[0:length])
if err != nil {
l.logger.Printf("error decoding compressed entry from %s: %s", fileName, err.Error())
return nil
}
// and marshal it and send it to the cache
switch walEntryType(entryType) {
case pointsEntry:
points, err := tsdb.ParsePoints(data)
if err != nil {
return err
}
l.addToCache(points, nil, nil)
case fieldsEntry:
fields := make(map[string]*tsdb.MeasurementFields)
if err := json.Unmarshal(data, &fields); err != nil {
return err
}
l.addToCache(nil, fields, nil)
case seriesEntry:
series := make([]*tsdb.SeriesCreate, 0)
if err := json.Unmarshal(data, &series); err != nil {
return err
}
l.addToCache(nil, nil, series)
}
}
}
func (l *Log) writeToLog(writeType walEntryType, data []byte) error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.currentSegmentFile == nil {
l.newSegmentFile()
if err := l.newSegmentFile(); err != nil {
// fail hard since we can't write data
panic(fmt.Sprintf("error opening new segment file for wal: %s", err.Error()))
}
}
// The panics here are an intentional choice. Based on reports from users
// it's better to fail hard if the database can't take writes. Then they'll
// get alerted and fix whatever is broken. Remove these and face Paul's wrath.
if _, err := l.currentSegmentFile.Write([]byte{byte(writeType)}); err != nil {
panic(fmt.Sprintf("error writing type to wal: %s", err.Error()))
}
@ -329,12 +460,14 @@ func (l *Log) close() error {
// flush writes all wal data in memory to the index
func (l *Log) flush(flush flushType) error {
// only flush if there isn't one already running
l.writeLock.Lock()
if l.flushRunning {
l.writeLock.Unlock()
return nil
}
// only hold the lock while we rotate the segment file
l.flushRunning = true
defer func() {
l.writeLock.Lock()
@ -363,13 +496,7 @@ func (l *Log) flush(flush flushType) error {
valueCount += len(v)
}
if l.LoggingEnabled {
ftype := "idle"
if flush == memoryFlush {
ftype = "memory"
}
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, l.memorySize)
}
flushSize := l.memorySize
// reset the memory being used by the cache
l.memorySize = 0
@ -384,6 +511,21 @@ func (l *Log) flush(flush flushType) error {
l.cacheLock.Unlock()
// exit if there's nothing to flush to the index
if len(valuesByKey) == 0 && len(mfc) == 0 && len(scc) == 0 {
return nil
}
if l.LoggingEnabled {
ftype := "idle"
if flush == memoryFlush {
ftype = "memory"
} else if flush == startupFlush {
ftype = "startup"
}
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, flushSize)
}
startTime := time.Now()
if err := l.Index.WriteAndCompact(valuesByKey, mfc, scc); err != nil {
return err

170
tsdb/engine/pd1/wal_test.go Normal file
View File

@ -0,0 +1,170 @@
package pd1_test
import (
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/pd1"
)
func TestWAL_TestWriteQueryOpen(t *testing.T) {
w := NewWAL()
defer w.Cleanup()
var vals map[string]pd1.Values
var fields map[string]*tsdb.MeasurementFields
var series []*tsdb.SeriesCreate
w.Index = &MockIndexWriter{
fn: func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
vals = valuesByKey
fields = measurementFieldsToSave
series = seriesToCreate
return nil
},
}
if err := w.Open(); err != nil {
t.Fatalf("error opening: %s", err.Error)
}
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=B value=1.2 1000000000")
p3 := parsePoint("cpu,host=A value=2.1 2000000000")
p4 := parsePoint("cpu,host=B value=2.2 2000000000")
fieldsToWrite := map[string]*tsdb.MeasurementFields{"foo": {Fields: map[string]*tsdb.Field{"bar": {Name: "value"}}}}
seriesToWrite := []*tsdb.SeriesCreate{{Measurement: "asdf"}}
if err := w.WritePoints([]tsdb.Point{p1, p2}, fieldsToWrite, seriesToWrite); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
c := w.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Next()
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
}
if 1.1 != btof64(v) {
t.Fatal("p1 data not equal")
}
c = w.Cursor("cpu,host=B", tsdb.Forward)
k, v = c.Next()
if btou64(k) != uint64(p2.UnixNano()) {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k))
}
if 1.2 != btof64(v) {
t.Fatal("p2 data not equal")
}
k, v = c.Next()
if k != nil {
t.Fatal("expected nil")
}
// ensure we can do another write to the wal and get stuff
if err := w.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error)
}
c = w.Cursor("cpu,host=A", tsdb.Forward)
k, v = c.Next()
if btou64(k) != uint64(p1.UnixNano()) {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k))
}
if 1.1 != btof64(v) {
t.Fatal("p1 data not equal")
}
k, v = c.Next()
if btou64(k) != uint64(p3.UnixNano()) {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
}
if 2.1 != btof64(v) {
t.Fatal("p3 data not equal")
}
// ensure we can seek
k, v = c.Seek(u64tob(2000000000))
if btou64(k) != uint64(p3.UnixNano()) {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k))
}
if 2.1 != btof64(v) {
t.Fatal("p3 data not equal")
}
k, v = c.Next()
if k != nil {
t.Fatal("expected nil")
}
// ensure we close and after open it flushes to the index
if err := w.Close(); err != nil {
t.Fatalf("failed to close: %s", err.Error())
}
if err := w.Open(); err != nil {
t.Fatalf("failed to open: %s", err.Error())
}
if len(vals["cpu,host=A#value"]) != 2 {
t.Fatal("expected host A values to flush to index on open")
}
if len(vals["cpu,host=B#value"]) != 1 {
t.Fatal("expected host B values to flush to index on open")
}
if err := w.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error)
}
c = w.Cursor("cpu,host=B", tsdb.Forward)
k, v = c.Next()
if btou64(k) != uint64(p4.UnixNano()) {
t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), btou64(k))
}
if 2.2 != btof64(v) {
t.Fatal("p4 data not equal")
}
if !reflect.DeepEqual(fields, fieldsToWrite) {
t.Fatal("fields not flushed")
}
if !reflect.DeepEqual(series, seriesToWrite) {
t.Fatal("series not flushed")
}
}
type Log struct {
*pd1.Log
path string
}
func NewWAL() *Log {
dir, err := ioutil.TempDir("", "pd1-test")
if err != nil {
panic("couldn't get temp dir")
}
l := &Log{
Log: pd1.NewLog(dir),
path: dir,
}
l.LoggingEnabled = true
return l
}
func (l *Log) Cleanup() error {
l.Close()
os.RemoveAll(l.path)
return nil
}
type MockIndexWriter struct {
fn func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func (m *MockIndexWriter) WriteAndCompact(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return m.fn(valuesByKey, measurementFieldsToSave, seriesToCreate)
}