Compress metadata, add Delete to WAL.

* All metadata for each shard is now stored in a single key with compressed value
* Creation of new metadata no longer requires a syncrhnous write to Bolt. It is passed to the WAL and written to Bolt periodically outside the write path
* Added DeleteSeries to WAL and updated bz1 to remove series there when DeleteSeries or DropMeasurement are called
pull/3717/head
Paul Dix 2015-08-18 08:10:51 -04:00
parent 3348dab4e0
commit a509df0484
5 changed files with 796 additions and 145 deletions

View File

@ -3,6 +3,7 @@ package bz1
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
@ -60,7 +61,9 @@ type Engine struct {
// WAL represents a write ahead log that can be queried
type WAL interface {
WritePoints(points []tsdb.Point) error
WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
DeleteSeries(keys []string) error
Cursor(key string) tsdb.Cursor
Open() error
Close() error
@ -101,8 +104,6 @@ func (e *Engine) Open() error {
// Initialize data file.
if err := e.db.Update(func(tx *bolt.Tx) error {
_, _ = tx.CreateBucketIfNotExists([]byte("series"))
_, _ = tx.CreateBucketIfNotExists([]byte("fields"))
_, _ = tx.CreateBucketIfNotExists([]byte("points"))
// Set file format, if not set yet.
@ -124,7 +125,7 @@ func (e *Engine) Open() error {
return err
}
return e.WAL.Open()
return nil
}
// Close closes the engine.
@ -149,16 +150,14 @@ func (e *Engine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return e.db.View(func(tx *bolt.Tx) error {
if err := e.db.View(func(tx *bolt.Tx) error {
// Load measurement metadata
meta := tx.Bucket([]byte("fields"))
c := meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fields, err := e.readFields(tx)
if err != nil {
return err
}
for k, mf := range fields {
m := index.CreateMeasurementIndexIfNotExists(string(k))
mf := &tsdb.MeasurementFields{}
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
for name, _ := range mf.Fields {
m.SetFieldName(name)
}
@ -167,99 +166,50 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
}
// Load series metadata
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}
series, err := e.readSeries(tx)
if err != nil {
return err
}
for k, series := range series {
series.InitializeShards()
index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(k)), series)
}
return nil
})
}); err != nil {
return err
}
// now flush the metadata that was in the WAL, but hand't yet been flushed
if err := e.WAL.LoadMetadataIndex(index, measurementFields); err != nil {
return err
}
// finally open the WAL up
return e.WAL.Open()
}
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
// Write series & field metadata.
if len(measurementFieldsToSave) > 0 || len(seriesToCreate) > 0 {
if err := e.db.Update(func(tx *bolt.Tx) error {
if err := e.writeSeries(tx, seriesToCreate); err != nil {
return fmt.Errorf("write series: %s", err)
}
if err := e.writeFields(tx, measurementFieldsToSave); err != nil {
return fmt.Errorf("write fields: %s", err)
}
return nil
}); err != nil {
return err
}
}
// Write points to the WAL.
if err := e.WAL.WritePoints(points); err != nil {
if err := e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil {
return fmt.Errorf("write points: %s", err)
}
return nil
}
// writeSeries writes a list of series to the metadata.
func (e *Engine) writeSeries(tx *bolt.Tx, a []*tsdb.SeriesCreate) error {
// Ignore if there are no series.
if len(a) == 0 {
return nil
}
// Marshal and insert each series into the metadata.
b := tx.Bucket([]byte("series"))
for _, sc := range a {
// Marshal series into bytes.
data, err := sc.Series.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal series: %s", err)
}
// Insert marshaled data into appropriate key.
if err := b.Put([]byte(sc.Series.Key), data); err != nil {
return fmt.Errorf("put: %s", err)
}
}
return nil
}
// writeFields writes a list of measurement fields to the metadata.
func (e *Engine) writeFields(tx *bolt.Tx, m map[string]*tsdb.MeasurementFields) error {
// Ignore if there are no fields to save.
if len(m) == 0 {
return nil
}
// Persist each measurement field in the map.
b := tx.Bucket([]byte("fields"))
for k, f := range m {
// Marshal field into bytes.
data, err := f.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal measurement field: %s", err)
}
// Insert marshaled data into key.
if err := b.Put([]byte(k), data); err != nil {
return fmt.Errorf("put: %s", err)
}
}
return nil
}
// WriteIndex writes marshaled points to the engine's underlying index.
func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error {
func (e *Engine) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return e.db.Update(func(tx *bolt.Tx) error {
// Write series & field metadata.
if err := e.writeNewSeries(tx, seriesToCreate); err != nil {
return fmt.Errorf("write series: %s", err)
}
if err := e.writeNewFields(tx, measurementFieldsToSave); err != nil {
return fmt.Errorf("write fields: %s", err)
}
for key, values := range pointsByKey {
if err := e.writeIndex(tx, key, values); err != nil {
return fmt.Errorf("write: key=%x, err=%s", key, err)
@ -269,6 +219,103 @@ func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error {
})
}
func (e *Engine) writeNewFields(tx *bolt.Tx, measurementFieldsToSave map[string]*tsdb.MeasurementFields) error {
if len(measurementFieldsToSave) == 0 {
return nil
}
// read in all the previously saved fields
fields, err := e.readFields(tx)
if err != nil {
return err
}
// add the new ones or overwrite old ones
for name, mf := range measurementFieldsToSave {
fields[name] = mf
}
return e.writeFields(tx, fields)
}
func (e *Engine) writeFields(tx *bolt.Tx, fields map[string]*tsdb.MeasurementFields) error {
// compress and save everything
data, err := json.Marshal(fields)
if err != nil {
return err
}
return tx.Bucket([]byte("meta")).Put([]byte("fields"), snappy.Encode(nil, data))
}
func (e *Engine) readFields(tx *bolt.Tx) (map[string]*tsdb.MeasurementFields, error) {
fields := make(map[string]*tsdb.MeasurementFields)
b := tx.Bucket([]byte("meta")).Get([]byte("fields"))
if b == nil {
return fields, nil
}
data, err := snappy.Decode(nil, b)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &fields); err != nil {
return nil, err
}
return fields, nil
}
func (e *Engine) writeNewSeries(tx *bolt.Tx, seriesToCreate []*tsdb.SeriesCreate) error {
if len(seriesToCreate) == 0 {
return nil
}
// read in previously saved series
series, err := e.readSeries(tx)
if err != nil {
return err
}
// add new ones, compress and save
for _, s := range seriesToCreate {
series[s.Series.Key] = s.Series
}
return e.writeSeries(tx, series)
}
func (e *Engine) writeSeries(tx *bolt.Tx, series map[string]*tsdb.Series) error {
data, err := json.Marshal(series)
if err != nil {
return err
}
return tx.Bucket([]byte("meta")).Put([]byte("series"), snappy.Encode(nil, data))
}
func (e *Engine) readSeries(tx *bolt.Tx) (map[string]*tsdb.Series, error) {
series := make(map[string]*tsdb.Series)
b := tx.Bucket([]byte("meta")).Get([]byte("series"))
if b == nil {
return series, nil
}
data, err := snappy.Decode(nil, b)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &series); err != nil {
return nil, err
}
return series, nil
}
// writeIndex writes a set of points for a single key.
func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
// Ignore if there are no points.
@ -398,36 +445,56 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
// DeleteSeries deletes the series from the engine.
func (e *Engine) DeleteSeries(keys []string) error {
// remove it from the WAL first
if err := e.WAL.DeleteSeries(keys); err != nil {
return err
}
return e.db.Update(func(tx *bolt.Tx) error {
series, err := e.readSeries(tx)
if err != nil {
return err
}
for _, k := range keys {
if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil {
return fmt.Errorf("delete series metadata: %s", err)
}
delete(series, k)
if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound {
return fmt.Errorf("delete series data: %s", err)
}
}
return nil
return e.writeSeries(tx, series)
})
}
// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
// remove from the WAL first so it won't get flushed after removing from Bolt
if err := e.WAL.DeleteSeries(seriesKeys); err != nil {
return err
}
return e.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket([]byte("fields")).Delete([]byte(name)); err != nil {
fields, err := e.readFields(tx)
if err != nil {
return err
}
delete(fields, name)
if err := e.writeFields(tx, fields); err != nil {
return err
}
series, err := e.readSeries(tx)
if err != nil {
return err
}
for _, k := range seriesKeys {
if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil {
return fmt.Errorf("delete series metadata: %s", err)
}
delete(series, k)
if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound {
return fmt.Errorf("delete series data: %s", err)
}
}
return nil
return e.writeSeries(tx, series)
})
}

View File

@ -23,15 +23,16 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Setup nop mock.
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil }
// Write series metadata.
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
// Setup mock that writes the index
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})},
{Series: tsdb.NewSeries("series with spaces", nil)},
}); err != nil {
}
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, nil, seriesToCreate) }
// Write series metadata.
if err := e.WritePoints(nil, nil, seriesToCreate); err != nil {
t.Fatal(err)
}
@ -62,17 +63,18 @@ func TestEngine_LoadMetadataIndex_Fields(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Setup nop mock.
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil }
// Write series metadata.
if err := e.WritePoints(nil, map[string]*tsdb.MeasurementFields{
// Setup mock that writes the index
fields := map[string]*tsdb.MeasurementFields{
"cpu": &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{
"value": &tsdb.Field{ID: 0, Name: "value"},
},
},
}, nil); err != nil {
}
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, fields, nil) }
// Write series metadata.
if err := e.WritePoints(nil, fields, nil); err != nil {
t.Fatal(err)
}
@ -150,7 +152,7 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
"mem": [][]byte{
append(u64tob(0), 0x30),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -189,7 +191,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
append(u64tob(20), 0x20),
append(u64tob(30), 0x30),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -201,7 +203,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
append(u64tob(25), 0x25),
append(u64tob(31), 0x31),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -210,7 +212,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
"cpu": [][]byte{
append(u64tob(31), 0xFF),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -239,7 +241,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
func TestEngine_WriteIndex_NoKeys(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
if err := e.WriteIndex(nil); err != nil {
if err := e.WriteIndex(nil, nil, nil); err != nil {
t.Fatal(err)
}
}
@ -248,7 +250,7 @@ func TestEngine_WriteIndex_NoKeys(t *testing.T) {
func TestEngine_WriteIndex_NoPoints(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
if err := e.WriteIndex(map[string][][]byte{"cpu": nil}); err != nil {
if err := e.WriteIndex(map[string][][]byte{"cpu": nil}, nil, nil); err != nil {
t.Fatal(err)
}
}
@ -266,7 +268,7 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
// Write points to index in multiple sets.
for _, set := range sets {
if err := e.WriteIndex(map[string][][]byte(set)); err != nil {
if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil {
t.Fatal(err)
}
}
@ -354,10 +356,16 @@ type EnginePointsWriter struct {
WritePointsFn func(points []tsdb.Point) error
}
func (w *EnginePointsWriter) WritePoints(points []tsdb.Point) error {
func (w *EnginePointsWriter) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return w.WritePointsFn(points)
}
func (w *EnginePointsWriter) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return nil
}
func (w *EnginePointsWriter) DeleteSeries(keys []string) error { return nil }
func (w *EnginePointsWriter) Open() error { return nil }
func (w *EnginePointsWriter) Close() error { return nil }

View File

@ -21,6 +21,7 @@ package wal
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
@ -70,6 +71,13 @@ const (
// FileExtension is the file extension we expect for wal segments
FileExtension = "wal"
// MetaFileExtension is the file extension for the log files of new fields and measurements that get created
MetaFileExtension = "meta"
// MetaFlushInterval is the period after which any compressed meta data in the .meta file will get
// flushed to the index
MetaFlushInterval = 10 * time.Minute
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second
)
@ -90,6 +98,8 @@ const (
// thresholdFlush indicates that we should flush all series over the ReadySize
// and compact all other series
thresholdFlush
// deleteFlush indicates that we're flushing because series need to be removed from the WAL
deleteFlush
)
var (
@ -126,6 +136,9 @@ type Log struct {
mu sync.RWMutex
partitions map[uint8]*Partition
// metaFile is the file that compressed metadata like series and fields are written to
metaFile *os.File
// FlushColdInterval is the period of time after which a partition will do a
// full flush and compaction if it has been cold for writes.
FlushColdInterval time.Duration
@ -136,13 +149,17 @@ type Log struct {
// MaxSeriesSize controls when a partition should get flushed to index and compacted
// if any series in the partition has exceeded this size threshold
MaxSeriesSize int
// ReadySeriesSize is the minimum size a series of points must get to before getting flushed.
ReadySeriesSize int
// CompactionThreshold controls when a parition will be flushed. Once this
// percentage of series in a partition are ready, a flush and compaction will be triggered.
CompactionThreshold float64
// PartitionSizeThreshold specifies when a partition should be forced to be flushed.
PartitionSizeThreshold uint64
// partitionCount is the number of separate partitions to create for the WAL.
// Compactions happen per partition. So this number will affect what percentage
// of the WAL gets compacted at a time. For instance, a setting of 10 means
@ -159,7 +176,7 @@ type IndexWriter interface {
// time ascending points where each byte array is:
// int64 time
// data
WriteIndex(pointsByKey map[string][][]byte) error
WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func NewLog(path string) *Log {
@ -186,6 +203,11 @@ func (l *Log) Open() error {
return err
}
// open the metafile for writing
if err := l.nextMetaFile(); err != nil {
return err
}
// open the partitions
l.partitions = make(map[uint8]*Partition)
for i := uint64(1); i <= l.partitionCount; i++ {
@ -219,29 +241,255 @@ func (l *Log) Cursor(key string) tsdb.Cursor {
return l.partition([]byte(key)).cursor(key)
}
func (l *Log) WritePoints(points []tsdb.Point) error {
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
partitionsToWrite := l.pointsToPartitions(points)
// get it to disk
if err := func() error {
l.mu.RLock()
defer l.mu.RUnlock()
if err := l.writeSeriesAndFields(fields, series); err != nil {
l.logger.Println("error writing series and fields: ", err.Error())
return err
}
for p, points := range partitionsToWrite {
if err := p.Write(points); err != nil {
return err
// get it to disk
l.mu.RLock()
defer l.mu.RUnlock()
for p, points := range partitionsToWrite {
if err := p.Write(points); err != nil {
return err
}
}
return nil
}
// Flush will force a flush on all paritions
func (l *Log) Flush() error {
l.mu.RLock()
defer l.mu.RUnlock()
for _, p := range l.partitions {
if err := p.flushAndCompact(idleFlush); err != nil {
return err
}
}
return nil
}
// LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function
// should be called before making a call to Open()
func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
metaFiles, err := l.metadataFiles()
if err != nil {
return err
}
measurementFieldsToSave := make(map[string]*tsdb.MeasurementFields)
seriesToCreate := make([]*tsdb.SeriesCreate, 0)
// read all the metafiles off disk
for _, fn := range metaFiles {
a, err := l.readMetadataFile(fn)
if err != nil {
return err
}
// loop through the seriesAndFields and add them to the index and the collection to be written to the index
for _, sf := range a {
for k, mf := range sf.Fields {
measurementFieldsToSave[k] = mf
m := index.CreateMeasurementIndexIfNotExists(string(k))
for name, _ := range mf.Fields {
m.SetFieldName(name)
}
mf.Codec = tsdb.NewFieldCodec(mf.Fields)
measurementFields[m.Name] = mf
}
for _, sc := range sf.Series {
seriesToCreate = append(seriesToCreate, sc)
sc.Series.InitializeShards()
index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(sc.Series.Key)), sc.Series)
}
}
}
if err := l.Index.WriteIndex(nil, measurementFieldsToSave, seriesToCreate); err != nil {
return err
}
// now remove all the old metafiles
for _, fn := range metaFiles {
if err := os.Remove(fn); err != nil {
return err
}
}
return nil
}
// DeleteSeries will flush the metadata that is in the WAL to the index and remove
// all series specified from the cache and the segment files in each partition. This
// will block all writes while a compaction is done against all partitions. This function
// is meant to be called by bz1 BEFORE it updates its own index, since the metadata
// is flushed here first.
func (l *Log) DeleteSeries(keys []string) error {
// we want to stop any writes from happening to ensure the data gets cleared
l.mu.Lock()
defer l.mu.Unlock()
if err := l.flushMetadata(); err != nil {
return err
}
for _, p := range l.partitions {
p.deleteSeries(keys)
}
return nil
}
// readMetadataFile will read the entire contents of the meta file and return a slice of the
// seriesAndFields objects that were written in. It ignores file errors since those can't be
// recovered.
func (l *Log) readMetadataFile(fileName string) ([]*seriesAndFields, error) {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0666)
if err != nil {
return nil, err
}
a := make([]*seriesAndFields, 0)
length := make([]byte, 8)
for {
// get the length of the compressed seriesAndFields blob
_, err := f.Read(length)
if err == io.EOF {
break
} else if err != nil {
f.Close()
return nil, err
}
dataLength := btou64(length)
if dataLength == 0 {
break
}
// read in the compressed block and decod it
b := make([]byte, dataLength)
_, err = f.Read(b)
if err == io.EOF {
break
} else if err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error reading lenght of metadata: ", err.Error())
break
}
buf, err := snappy.Decode(nil, b)
if err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error reading compressed metadata info: ", err.Error())
break
}
sf := &seriesAndFields{}
if err := json.Unmarshal(buf, sf); err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error unmarshaling json for new series and fields: ", err.Error())
break
}
a = append(a, sf)
}
if err := f.Close(); err != nil {
return nil, err
}
return a, nil
}
// writeSeriesAndFields will write the compressed fields and series to the meta file. This file persists the data
// in case the server gets shutdown before the WAL has a chance to flush everything to the cache. By default this
// file is flushed on start when bz1 calls LoadMetaDataIndex
func (l *Log) writeSeriesAndFields(fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
if len(fields) == 0 && len(series) == 0 {
return nil
}(); err != nil {
}
sf := &seriesAndFields{Fields: fields, Series: series}
b, err := json.Marshal(sf)
if err != nil {
return err
}
cb := snappy.Encode(nil, b)
l.mu.Lock()
defer l.mu.Unlock()
if _, err := l.metaFile.Write(u64tob(uint64(len(cb)))); err != nil {
return err
}
if _, err := l.metaFile.Write(cb); err != nil {
return err
}
return l.metaFile.Sync()
}
// nextMetaFile will close the current file if there is one open and open a new file to log
// metadata updates to. This function assumes that you've locked l.mu elsewhere.
func (l *Log) nextMetaFile() error {
if l.metaFile != nil {
if err := l.metaFile.Close(); err != nil {
return err
}
}
metaFiles, err := l.metadataFiles()
if err != nil {
return err
}
id := 0
if len(metaFiles) > 0 {
num := strings.Split(filepath.Base(metaFiles[len(metaFiles)-1]), ".")[0]
n, err := strconv.ParseInt(num, 10, 32)
if err != nil {
return err
}
id = int(n)
}
nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension))
l.metaFile, err = os.OpenFile(nextFileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
return nil
}
func (l *Log) Flush() error {
return fmt.Errorf("explicit call to flush isn't implemented yet")
// metadataFiles returns the files in the WAL directory with the MetaFileExtension
func (l *Log) metadataFiles() ([]string, error) {
path := filepath.Join(l.path, fmt.Sprintf("*.%s", MetaFileExtension))
a, err := filepath.Glob(path)
if err != nil {
return nil, err
}
sort.Strings(a)
return a, nil
}
// pointsToPartitions returns a map that organizes the points into the partitions they should be mapped to
@ -316,6 +564,11 @@ func (l *Log) close() error {
}
}
if err := l.metaFile.Close(); err != nil {
return err
}
l.metaFile = nil
return nil
}
@ -337,10 +590,13 @@ func (l *Log) triggerAutoFlush() {
func (l *Log) autoflusher(closing chan struct{}) {
defer l.wg.Done()
metaFlushTicker := time.NewTicker(MetaFlushInterval)
for {
// Wait for close or flush signal.
select {
case <-closing:
metaFlushTicker.Stop()
return
case <-l.flushCheckTimer.C:
l.triggerAutoFlush()
@ -349,10 +605,61 @@ func (l *Log) autoflusher(closing chan struct{}) {
if err := l.Flush(); err != nil {
l.logger.Printf("flush error: %s", err)
}
case <-metaFlushTicker.C:
if err := l.flushMetadata(); err != nil {
l.logger.Printf("metadata flush error: %s", err.Error())
}
}
}
}
// flushMetadata will write start a new metafile for writes to go through and then flush all
// metadata from previous files to the index. After a sucessful write, the metadata files
// will be removed. While the flush to index is happening we aren't blocked for new metadata writes.
func (l *Log) flushMetadata() error {
files, err := l.metadataFiles()
if err != nil {
return err
}
if err := l.nextMetaFile(); err != nil {
return err
}
measurements := make(map[string]*tsdb.MeasurementFields)
series := make([]*tsdb.SeriesCreate, 0)
// read all the measurement fields and series from the metafiles
for _, fn := range files {
a, err := l.readMetadataFile(fn)
if err != nil {
return err
}
for _, sf := range a {
for k, mf := range sf.Fields {
measurements[k] = mf
}
series = append(series, sf.Series...)
}
}
// write them to the index
if err := l.Index.WriteIndex(nil, measurements, series); err != nil {
return err
}
// remove the old files now that we've persisted them elsewhere
for _, fn := range files {
if err := os.Remove(fn); err != nil {
return err
}
}
return nil
}
// walPartition returns the partition number that key belongs to.
func (l *Log) partition(key []byte) *Partition {
h := fnv.New64a()
@ -688,7 +995,7 @@ func (p *Partition) flushAndCompact(flush flushType) error {
fmt.Printf("compacting %d series from partition %d\n", len(c.seriesToFlush), p.id)
// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush); err != nil {
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}
@ -707,6 +1014,12 @@ func (p *Partition) flushAndCompact(flush flushType) error {
p.mu.Unlock()
}()
err = p.compactFiles(c, flush)
fmt.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime))
return err
}
func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
// now compact all the old data
fileNames, err := p.segmentFileNames()
if err != nil {
@ -777,8 +1090,6 @@ func (p *Partition) flushAndCompact(flush flushType) error {
return err
}
fmt.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime))
// if it's an idle flush remove the compaction file
if flush == idleFlush {
return os.Remove(compactionFile.Name())
@ -941,6 +1252,39 @@ func (p *Partition) segmentFileNames() ([]string, error) {
return filepath.Glob(path)
}
// deleteSeries will perform a compaction on the partition, removing all data
// from any of the series passed in.
func (p *Partition) deleteSeries(keys []string) error {
p.mu.Lock()
defer p.mu.Unlock()
p.compactionRunning = true
// remove the series from the cache and prepare the compaction info
size := 0
seriesToFlush := make(map[string][][]byte)
for _, k := range keys {
s := p.cache[k]
if s != nil {
seriesToFlush[k] = s
size += p.cacheSizes[k]
delete(p.cache, k)
delete(p.cacheDirtySort, k)
delete(p.cacheSizes, k)
}
}
c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size}
// roll over a new segment file so we can compact all the old ones
if err := p.newSegmentFile(); err != nil {
return err
}
c.compactFilesLessThan = p.currentSegmentID
return p.compactFiles(c, deleteFlush)
}
// compactionInfo is a data object with information about a compaction running
// and the series that will be flushed to the index
type compactionInfo struct {
@ -1097,6 +1441,13 @@ func (c *cursor) Next() (key, value []byte) {
}
// seriesAndFields is a data struct to serialize new series and fields
// to get created into WAL segment files
type seriesAndFields struct {
Fields map[string]*tsdb.MeasurementFields `json:"fields,omitempty"`
Series []*tsdb.SeriesCreate `json:"series,omitempty"`
}
// marshalWALEntry encodes point data into a single byte slice.
//
// The format of the byte slice is:

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"reflect"
"testing"
"time"
@ -38,7 +39,7 @@ func TestWAL_WritePoints(t *testing.T) {
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
p3 := parsePoint("cpu,host=B value=1.0 1", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2, p3}); err != nil {
if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -84,7 +85,7 @@ func TestWAL_WritePoints(t *testing.T) {
p6 := parsePoint("cpu,host=A value=1.3 2", codec)
// // ensure we can write to a new partition
// p7 := parsePoint("cpu,region=west value=2.2", codec)
if err := log.WritePoints([]tsdb.Point{p4, p5, p6}); err != nil {
if err := log.WritePoints([]tsdb.Point{p4, p5, p6}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -142,7 +143,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2}); err != nil {
if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -175,7 +176,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}); err != nil {
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
@ -221,7 +222,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2}); err != nil {
if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -260,7 +261,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}); err != nil {
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
@ -301,7 +302,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error {
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
@ -339,7 +340,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec)); err != nil {
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
@ -408,7 +409,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error {
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
@ -434,7 +435,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec)); err != nil {
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
@ -464,6 +465,226 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
}
}
func TestWAL_SeriesAndFieldsGetPersisted(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
var measurementsToIndex map[string]*tsdb.MeasurementFields
var seriesToIndex []*tsdb.SeriesCreate
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
measurementsToIndex = measurementFieldsToSave
seriesToIndex = append(seriesToIndex, seriesToCreate...)
return nil
}}
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
p3 := parsePoint("cpu,host=B value=1.0 1", codec)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})},
}
measaurementsToCreate := map[string]*tsdb.MeasurementFields{
"cpu": {
Fields: map[string]*tsdb.Field{
"value": {ID: 1, Name: "value"},
},
},
}
if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, measaurementsToCreate, seriesToCreate); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// now close it and see if loading the metadata index will populate the measurement and series info
log.Close()
idx := tsdb.NewDatabaseIndex()
mf := make(map[string]*tsdb.MeasurementFields)
if err := log.LoadMetadataIndex(idx, mf); err != nil {
t.Fatalf("error loading metadata index: %s", err.Error())
}
s := idx.Series("cpu,host=A")
if s == nil {
t.Fatal("expected to find series cpu,host=A in index %v", idx)
}
s = idx.Series("cpu,host=B")
if s == nil {
t.Fatal("expected to find series cpu,host=B in index")
}
m := mf["cpu"]
if m == nil {
t.Fatal("expected to find measurement fields for cpu", mf)
}
if m.Fields["value"] == nil {
t.Fatal("expected to find field definition for 'value'")
}
// ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these
for i, s := range seriesToCreate {
if seriesToIndex[i].Measurement != s.Measurement {
t.Fatal("expected measurement to be the same")
}
if seriesToIndex[i].Series.Key != s.Series.Key {
t.Fatal("expected series key to be the same")
}
if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) {
t.Fatal("expected series tags to be the same")
}
}
// ensure that the measurement fields were flushed to the index
for k, v := range measaurementsToCreate {
m := measurementsToIndex[k]
if m == nil {
t.Fatalf("measurement %s wasn't indexed", k)
}
if !reflect.DeepEqual(m.Fields, v.Fields) {
t.Fatal("measurement fields not equal")
}
}
// now open and close the log and try to reload the metadata index, which should now be empty
if err := log.Open(); err != nil {
t.Fatalf("error opening log: %s", err.Error())
}
if err := log.Close(); err != nil {
t.Fatalf("error closing log: %s", err.Error())
}
idx = tsdb.NewDatabaseIndex()
mf = make(map[string]*tsdb.MeasurementFields)
if err := log.LoadMetadataIndex(idx, mf); err != nil {
t.Fatalf("error loading metadata index: %s", err.Error())
}
if len(idx.Measurements()) != 0 || len(mf) != 0 {
t.Fatal("expected index and measurement fields to be empty")
}
}
func TestWAL_DeleteSeries(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
var seriesToIndex []*tsdb.SeriesCreate
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
seriesToIndex = append(seriesToIndex, seriesToCreate...)
return nil
}}
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})},
}
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=B value=0.9 2", codec)
p3 := parsePoint("cpu,host=A value=25.3 4", codec)
p4 := parsePoint("cpu,host=B value=1.0 3", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2, p3, p4}, nil, seriesToCreate); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// ensure data is there
c := log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); btou64(k) != 2 {
t.Fatal("expected data point for cpu,host=B")
}
// delete the series and ensure metadata was flushed and data is gone
if err := log.DeleteSeries([]string{"cpu,host=B"}); err != nil {
t.Fatalf("error deleting series: %s", err.Error())
}
// ensure data is there
c = log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); k != nil {
t.Fatal("expected no data for cpu,host=B")
}
// ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these
for i, s := range seriesToCreate {
if seriesToIndex[i].Measurement != s.Measurement {
t.Fatal("expected measurement to be the same")
}
if seriesToIndex[i].Series.Key != s.Series.Key {
t.Fatal("expected series key to be the same")
}
if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) {
t.Fatal("expected series tags to be the same")
}
}
// close and re-open the WAL to ensure that the data didn't show back up
if err := log.Close(); err != nil {
t.Fatalf("error closing log: %s", err.Error())
}
if err := log.Open(); err != nil {
t.Fatalf("error opening log: %s", err.Error())
}
// ensure data is there
c = log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); k != nil {
t.Fatal("expected no data for cpu,host=B")
}
}
// test that partitions get compacted and flushed when number of series hits compaction threshold
// test that partitions get compacted and flushed when a single series hits the compaction threshold
// test that writes slow down when the partition size threshold is hit
@ -557,11 +778,11 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
// }
type testIndexWriter struct {
fn func(pointsByKey map[string][][]byte) error
fn func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func (t *testIndexWriter) WriteIndex(pointsByKey map[string][][]byte) error {
return t.fn(pointsByKey)
func (t *testIndexWriter) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return t.fn(pointsByKey, measurementFieldsToSave, seriesToCreate)
}
func openTestWAL() *Log {

View File

@ -1018,6 +1018,10 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
return nil
}
func (s *Series) InitializeShards() {
s.shardIDs = make(map[uint64]bool)
}
// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {