diff --git a/tsdb/engine/pd1/wal.go b/tsdb/engine/pd1/wal.go index 26f2af48ff..03548f062e 100644 --- a/tsdb/engine/pd1/wal.go +++ b/tsdb/engine/pd1/wal.go @@ -2,6 +2,7 @@ package pd1 import ( "bytes" + "encoding/json" "fmt" "io" "log" @@ -28,6 +29,8 @@ const ( // defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria defaultFlushCheckInterval = time.Second + + writeBufLen = 32 << 10 // 32kb ) // flushType indiciates why a flush and compaction are being run so the partition can @@ -45,8 +48,8 @@ const ( idleFlush // deleteFlush indicates that we're flushing because series need to be removed from the WAL deleteFlush - - writeBufLen = 32 << 10 // 32kb + // startupFlush indicates that we're flushing because the database is starting up + startupFlush ) // walEntry is a byte written to a wal segment file that indicates what the following compressed block contains @@ -129,7 +132,7 @@ func NewLog(path string) *Log { SegmentSize: DefaultSegmentSize, MemorySizeThreshold: tsdb.DefaultPartitionSizeThreshold, flushCheckInterval: defaultFlushCheckInterval, - logger: log.New(os.Stderr, "[pwl] ", log.LstdFlags), + logger: log.New(os.Stderr, "[pd1wal] ", log.LstdFlags), } } @@ -138,7 +141,7 @@ func (l *Log) Open() error { if l.LoggingEnabled { l.logger.Printf("PD1 WAL starting with %d memory size threshold\n", l.MemorySizeThreshold) - l.logger.Printf("WAL writing to %s\n", l.path) + l.logger.Printf("PD1 WAL writing to %s\n", l.path) } if err := os.MkdirAll(l.path, 0777); err != nil { return err @@ -147,7 +150,11 @@ func (l *Log) Open() error { l.cache = make(map[string]Values) l.cacheDirtySort = make(map[string]bool) l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields) - // TODO: read segment files and flush them all to disk + + // flush out any WAL entries that are there from before + if err := l.readAndFlushWAL(); err != nil { + return err + } l.flushCheckTimer = time.NewTimer(l.flushCheckInterval) @@ -194,6 +201,7 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { // make the write durable if specified if !l.SkipDurability { + // write the points pointStrings := make([]string, len(points)) for i, p := range points { pointStrings[i] = p.String() @@ -205,13 +213,47 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme return err } - // TODO: write the fields + // write the new fields + if len(fields) > 0 { + data, err := json.Marshal(fields) + if err != nil { + return err + } + compressed = snappy.Encode(compressed, data) + if err := l.writeToLog(fieldsEntry, compressed); err != nil { + return err + } + } - // TODO: write the series + // write the new series + if len(series) > 0 { + data, err := json.Marshal(series) + if err != nil { + return err + } + compressed = snappy.Encode(compressed, data) + if err := l.writeToLog(seriesEntry, compressed); err != nil { + return err + } + } } - // convert to values that can be either cached in memory or flushed to the index + // add everything to the cache + l.addToCache(points, fields, series) + + // usually skipping the cache is only for testing purposes and this was the easiest + // way to represent the logic (to cache and then immediately flush) + if l.SkipCache { + l.flush(idleFlush) + } + + return nil +} + +func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) { l.cacheLock.Lock() + defer l.cacheLock.Unlock() + for _, p := range points { for name, value := range p.Fields() { k := seriesFieldKey(string(p.Key()), name) @@ -235,25 +277,114 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme } l.seriesToCreateCache = append(l.seriesToCreateCache, series...) l.lastWriteTime = time.Now() - l.cacheLock.Unlock() +} - // usually skipping the cache is only for testing purposes and this was the easiest - // way to represent the logic (to cache and then immediately flush) - if l.SkipCache { - l.flush(idleFlush) +// readAndFlushWAL is called on open and will read the segment files in, flushing whenever +// the memory gets over the limit. Once all files have been read it will flush and remove the files +func (l *Log) readAndFlushWAL() error { + files, err := l.segmentFileNames() + if err != nil { + return err + } + + // read all the segment files and cache them, flushing along the way if we + // hit memory limits + for _, fn := range files { + if err := l.readFileToCache(fn); err != nil { + return err + } + + if l.memorySize > l.MemorySizeThreshold { + if err := l.flush(memoryFlush); err != nil { + return err + } + } + } + + // now flush and remove all the old files + if err := l.flush(startupFlush); err != nil { + return err } return nil } +func (l *Log) readFileToCache(fileName string) error { + f, err := os.OpenFile(fileName, os.O_RDONLY, 0666) + if err != nil { + return err + } + defer f.Close() + + buf := make([]byte, writeBufLen) + data := make([]byte, writeBufLen) + for { + // read the type and the length of the entry + _, err := io.ReadFull(f, buf[0:5]) + if err == io.EOF { + return nil + } else if err != nil { + l.logger.Printf("error reading segment file %s: %s", fileName, err.Error()) + return err + } + entryType := buf[0] + length := btou32(buf[1:5]) + + // read the compressed block and decompress it + if int(length) > len(buf) { + buf = make([]byte, length) + } + _, err = io.ReadFull(f, buf[0:length]) + if err == io.EOF { + l.logger.Printf("hit end of file while reading compressed wal entry from %s", fileName) + return nil + } else if err != nil { + return err + } + data, err = snappy.Decode(data, buf[0:length]) + if err != nil { + l.logger.Printf("error decoding compressed entry from %s: %s", fileName, err.Error()) + return nil + } + + // and marshal it and send it to the cache + switch walEntryType(entryType) { + case pointsEntry: + points, err := tsdb.ParsePoints(data) + if err != nil { + return err + } + l.addToCache(points, nil, nil) + case fieldsEntry: + fields := make(map[string]*tsdb.MeasurementFields) + if err := json.Unmarshal(data, &fields); err != nil { + return err + } + l.addToCache(nil, fields, nil) + case seriesEntry: + series := make([]*tsdb.SeriesCreate, 0) + if err := json.Unmarshal(data, &series); err != nil { + return err + } + l.addToCache(nil, nil, series) + } + } +} + func (l *Log) writeToLog(writeType walEntryType, data []byte) error { l.writeLock.Lock() defer l.writeLock.Unlock() if l.currentSegmentFile == nil { - l.newSegmentFile() + if err := l.newSegmentFile(); err != nil { + // fail hard since we can't write data + panic(fmt.Sprintf("error opening new segment file for wal: %s", err.Error())) + } } + // The panics here are an intentional choice. Based on reports from users + // it's better to fail hard if the database can't take writes. Then they'll + // get alerted and fix whatever is broken. Remove these and face Paul's wrath. if _, err := l.currentSegmentFile.Write([]byte{byte(writeType)}); err != nil { panic(fmt.Sprintf("error writing type to wal: %s", err.Error())) } @@ -329,12 +460,14 @@ func (l *Log) close() error { // flush writes all wal data in memory to the index func (l *Log) flush(flush flushType) error { + // only flush if there isn't one already running l.writeLock.Lock() if l.flushRunning { l.writeLock.Unlock() return nil } + // only hold the lock while we rotate the segment file l.flushRunning = true defer func() { l.writeLock.Lock() @@ -363,13 +496,7 @@ func (l *Log) flush(flush flushType) error { valueCount += len(v) } - if l.LoggingEnabled { - ftype := "idle" - if flush == memoryFlush { - ftype = "memory" - } - l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, l.memorySize) - } + flushSize := l.memorySize // reset the memory being used by the cache l.memorySize = 0 @@ -384,6 +511,21 @@ func (l *Log) flush(flush flushType) error { l.cacheLock.Unlock() + // exit if there's nothing to flush to the index + if len(valuesByKey) == 0 && len(mfc) == 0 && len(scc) == 0 { + return nil + } + + if l.LoggingEnabled { + ftype := "idle" + if flush == memoryFlush { + ftype = "memory" + } else if flush == startupFlush { + ftype = "startup" + } + l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, flushSize) + } + startTime := time.Now() if err := l.Index.WriteAndCompact(valuesByKey, mfc, scc); err != nil { return err diff --git a/tsdb/engine/pd1/wal_test.go b/tsdb/engine/pd1/wal_test.go new file mode 100644 index 0000000000..c1ef614650 --- /dev/null +++ b/tsdb/engine/pd1/wal_test.go @@ -0,0 +1,170 @@ +package pd1_test + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + + "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/influxdb/tsdb/engine/pd1" +) + +func TestWAL_TestWriteQueryOpen(t *testing.T) { + w := NewWAL() + defer w.Cleanup() + + var vals map[string]pd1.Values + var fields map[string]*tsdb.MeasurementFields + var series []*tsdb.SeriesCreate + + w.Index = &MockIndexWriter{ + fn: func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + vals = valuesByKey + fields = measurementFieldsToSave + series = seriesToCreate + return nil + }, + } + + if err := w.Open(); err != nil { + t.Fatalf("error opening: %s", err.Error) + } + + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=B value=1.2 1000000000") + p3 := parsePoint("cpu,host=A value=2.1 2000000000") + p4 := parsePoint("cpu,host=B value=2.2 2000000000") + fieldsToWrite := map[string]*tsdb.MeasurementFields{"foo": {Fields: map[string]*tsdb.Field{"bar": {Name: "value"}}}} + seriesToWrite := []*tsdb.SeriesCreate{{Measurement: "asdf"}} + + if err := w.WritePoints([]tsdb.Point{p1, p2}, fieldsToWrite, seriesToWrite); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + c := w.Cursor("cpu,host=A", tsdb.Forward) + k, v := c.Next() + if btou64(k) != uint64(p1.UnixNano()) { + t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k)) + } + if 1.1 != btof64(v) { + t.Fatal("p1 data not equal") + } + c = w.Cursor("cpu,host=B", tsdb.Forward) + k, v = c.Next() + if btou64(k) != uint64(p2.UnixNano()) { + t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), btou64(k)) + } + if 1.2 != btof64(v) { + t.Fatal("p2 data not equal") + } + + k, v = c.Next() + if k != nil { + t.Fatal("expected nil") + } + + // ensure we can do another write to the wal and get stuff + if err := w.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write: %s", err.Error) + } + + c = w.Cursor("cpu,host=A", tsdb.Forward) + k, v = c.Next() + if btou64(k) != uint64(p1.UnixNano()) { + t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), btou64(k)) + } + if 1.1 != btof64(v) { + t.Fatal("p1 data not equal") + } + k, v = c.Next() + if btou64(k) != uint64(p3.UnixNano()) { + t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k)) + } + if 2.1 != btof64(v) { + t.Fatal("p3 data not equal") + } + + // ensure we can seek + k, v = c.Seek(u64tob(2000000000)) + if btou64(k) != uint64(p3.UnixNano()) { + t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), btou64(k)) + } + if 2.1 != btof64(v) { + t.Fatal("p3 data not equal") + } + k, v = c.Next() + if k != nil { + t.Fatal("expected nil") + } + + // ensure we close and after open it flushes to the index + if err := w.Close(); err != nil { + t.Fatalf("failed to close: %s", err.Error()) + } + + if err := w.Open(); err != nil { + t.Fatalf("failed to open: %s", err.Error()) + } + + if len(vals["cpu,host=A#value"]) != 2 { + t.Fatal("expected host A values to flush to index on open") + } + + if len(vals["cpu,host=B#value"]) != 1 { + t.Fatal("expected host B values to flush to index on open") + } + + if err := w.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil { + t.Fatalf("failed to write: %s", err.Error) + } + c = w.Cursor("cpu,host=B", tsdb.Forward) + k, v = c.Next() + if btou64(k) != uint64(p4.UnixNano()) { + t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), btou64(k)) + } + if 2.2 != btof64(v) { + t.Fatal("p4 data not equal") + } + + if !reflect.DeepEqual(fields, fieldsToWrite) { + t.Fatal("fields not flushed") + } + + if !reflect.DeepEqual(series, seriesToWrite) { + t.Fatal("series not flushed") + } +} + +type Log struct { + *pd1.Log + path string +} + +func NewWAL() *Log { + dir, err := ioutil.TempDir("", "pd1-test") + if err != nil { + panic("couldn't get temp dir") + } + + l := &Log{ + Log: pd1.NewLog(dir), + path: dir, + } + l.LoggingEnabled = true + return l +} + +func (l *Log) Cleanup() error { + l.Close() + os.RemoveAll(l.path) + return nil +} + +type MockIndexWriter struct { + fn func(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error +} + +func (m *MockIndexWriter) WriteAndCompact(valuesByKey map[string]pd1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + return m.fn(valuesByKey, measurementFieldsToSave, seriesToCreate) +}