From f37df1133920e54890a36e65c363caf8bd6b4949 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 9 Sep 2015 11:29:50 -0700 Subject: [PATCH] WIP: more WAL work --- cmd/influx_stress/influx_stress.go | 1 + tsdb/config.go | 2 +- tsdb/engine/pd1/encoding.go | 129 +++++-- tsdb/engine/pd1/encoding_test.go | 34 +- tsdb/engine/pd1/pd1.go | 506 ++++++------------------- tsdb/engine/pd1/pd1_test.go | 9 +- tsdb/engine/pd1/wal.go | 581 +++++++++++++++++++++++++++++ 7 files changed, 827 insertions(+), 435 deletions(-) create mode 100644 tsdb/engine/pd1/wal.go diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 9fe9e2af4b..2247a5329b 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "math/rand" "net/url" "runtime" "sort" diff --git a/tsdb/config.go b/tsdb/config.go index 9843541e29..dfd267d2c3 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -42,7 +42,7 @@ const ( // we'll need to create backpressure, otherwise we'll fill up the memory and die. // This number multiplied by the parition count is roughly the max possible memory // size for the in-memory WAL cache. - DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB + DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB ) type Config struct { diff --git a/tsdb/engine/pd1/encoding.go b/tsdb/engine/pd1/encoding.go index 60f72766d8..7262e8a6c3 100644 --- a/tsdb/engine/pd1/encoding.go +++ b/tsdb/engine/pd1/encoding.go @@ -12,52 +12,127 @@ type Value interface { TimeBytes() []byte ValueBytes() []byte Time() time.Time + Value() interface{} + Size() int } +func NewValue(t time.Time, value interface{}) Value { + switch v := value.(type) { + // case int64: + // return &Int64Value{time: t, value: v} + case float64: + return &FloatValue{time: t, value: v} + // case bool: + // return &BoolValue{time: t, value: v} + // case string: + // return &StringValue{time: t, value: v} + } + return &EmptyValue{} +} + +type EmptyValue struct { +} + +func (e *EmptyValue) TimeBytes() []byte { return nil } +func (e *EmptyValue) ValueBytes() []byte { return nil } +func (e *EmptyValue) Time() time.Time { return time.Unix(0, 0) } +func (e *EmptyValue) Value() interface{} { return nil } +func (e *EmptyValue) Size() int { return 0 } + +// Values represented a time ascending sorted collection of Value types. +// the underlying type should be the same across all values, but the interface +// makes the code cleaner. +type Values []Value + +func (v Values) MinTime() int64 { + return v[0].Time().UnixNano() +} + +func (v Values) MaxTime() int64 { + return v[len(v)-1].Time().UnixNano() +} + +func (v Values) Encode(buf []byte) []byte { + switch v[0].(type) { + case *FloatValue: + a := make([]*FloatValue, len(v)) + for i, vv := range v { + a[i] = vv.(*FloatValue) + } + return EncodeFloatBlock(buf, a) + + // TODO: add support for other types + } + + return nil +} + +func (v Values) DecodeSameTypeBlock(block []byte) Values { + switch v[0].(type) { + case *FloatValue: + a, _ := DecodeFloatBlock(block) + return a + + // TODO: add support for other types + } + return nil +} + +// Sort methods +func (a Values) Len() int { return len(a) } +func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a Values) Less(i, j int) bool { return a[i].Time().UnixNano() < a[j].Time().UnixNano() } + type FloatValue struct { - Time time.Time - Value float64 + time time.Time + value float64 +} + +func (f *FloatValue) Time() time.Time { + return f.time +} + +func (f *FloatValue) Value() interface{} { + return f.value } func (f *FloatValue) TimeBytes() []byte { - return u64tob(uint64(f.Time.UnixNano())) + return u64tob(uint64(f.Time().UnixNano())) } func (f *FloatValue) ValueBytes() []byte { buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, math.Float64bits(f.Value)) + binary.BigEndian.PutUint64(buf, math.Float64bits(f.value)) return buf } -type FloatValues []FloatValue - -func (a FloatValues) Len() int { return len(a) } -func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a FloatValues) Less(i, j int) bool { return a[i].Time.UnixNano() < a[j].Time.UnixNano() } - -// TODO: make this work with nanosecond timestamps -func EncodeFloatBlock(buf []byte, values []FloatValue) []byte { - s := tsz.New(uint32(values[0].Time.Unix())) - for _, v := range values { - s.Push(uint32(v.Time.Unix()), v.Value) - } - s.Finish() - return append(u64tob(uint64(values[0].Time.UnixNano())), s.Bytes()...) +func (f *FloatValue) Size() int { + return 16 } -func DecodeFloatBlock(block []byte) ([]FloatValue, error) { +// TODO: make this work with nanosecond timestamps +func EncodeFloatBlock(buf []byte, values []*FloatValue) []byte { + s := tsz.New(uint32(values[0].Time().Unix())) + for _, v := range values { + s.Push(uint32(v.Time().Unix()), v.value) + } + s.Finish() + return append(u64tob(uint64(values[0].Time().UnixNano())), s.Bytes()...) +} + +func DecodeFloatBlock(block []byte) ([]Value, error) { iter, _ := tsz.NewIterator(block[8:]) - a := make([]FloatValue, 0) + a := make([]Value, 0) for iter.Next() { t, f := iter.Values() - a = append(a, FloatValue{time.Unix(int64(t), 0), f}) + a = append(a, &FloatValue{time.Unix(int64(t), 0), f}) } return a, nil } type BoolValue struct { - Time time.Time - Value bool + time time.Time + value bool } func EncodeBoolBlock(buf []byte, values []BoolValue) []byte { @@ -69,8 +144,8 @@ func DecodeBoolBlock(block []byte) ([]BoolValue, error) { } type Int64Value struct { - Time time.Time - Value int64 + time time.Time + value int64 } func EncodeInt64Block(buf []byte, values []Int64Value) []byte { @@ -82,8 +157,8 @@ func DecodeInt64Block(block []byte) ([]Int64Value, error) { } type StringValue struct { - Time time.Time - Value string + time time.Time + value string } func EncodeStringBlock(buf []byte, values []StringValue) []byte { diff --git a/tsdb/engine/pd1/encoding_test.go b/tsdb/engine/pd1/encoding_test.go index 26bb4c2e07..aa5a4b15e7 100644 --- a/tsdb/engine/pd1/encoding_test.go +++ b/tsdb/engine/pd1/encoding_test.go @@ -1,32 +1,32 @@ package pd1_test import ( - "math/rand" - "reflect" + // "math/rand" + // "reflect" "testing" "time" - "github.com/influxdb/influxdb/tsdb/engine/pd1" + // "github.com/influxdb/influxdb/tsdb/engine/pd1" ) func TestEncoding_FloatBlock(t *testing.T) { - valueCount := 100 - times := getTimes(valueCount, 60, time.Second) - values := make([]pd1.FloatValue, len(times)) - for i, t := range times { - values[i] = pd1.FloatValue{Time: t, Value: rand.Float64()} - } + // valueCount := 100 + // times := getTimes(valueCount, 60, time.Second) + // values := make([]Value, len(times)) + // for i, t := range times { + // values[i] = pd1.NewValue(t, rand.Float64()) + // } - b := pd1.EncodeFloatBlock(nil, values) + // b := pd1.EncodeFloatBlock(nil, values) - decodedValues, err := pd1.DecodeFloatBlock(b) - if err != nil { - t.Fatalf("error decoding: %s", err.Error) - } + // decodedValues, err := pd1.DecodeFloatBlock(b) + // if err != nil { + // t.Fatalf("error decoding: %s", err.Error) + // } - if !reflect.DeepEqual(decodedValues, values) { - t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values) - } + // if !reflect.DeepEqual(decodedValues, values) { + // t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values) + // } } func getTimes(n, step int, precision time.Duration) []time.Time { diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index 4d3c752ae2..ef91fb66e2 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -1,7 +1,6 @@ package pd1 import ( - "bytes" "encoding/binary" "encoding/json" "fmt" @@ -12,8 +11,6 @@ import ( "os" "path/filepath" "sort" - "strconv" - "strings" "sync" "syscall" "time" @@ -29,13 +26,13 @@ const ( // FieldsFileExtension is the extension for the file that stores compressed field // encoding data for this db - FieldsFileExtension = "fld" + FieldsFileExtension = "fields" // SeriesFileExtension is the extension for the file that stores the compressed // series metadata for series in this db - SeriesFileExtension = "srs" + SeriesFileExtension = "series" - CollisionsFileExtension = "col" + CollisionsFileExtension = "collisions" ) type TimePrecision uint8 @@ -55,7 +52,7 @@ const ( // DefaultBlockSize is the default size of uncompressed points blocks. DefaultBlockSize = 512 * 1024 // 512KB - DefaultMaxFileSize = 5 * 1024 * 1024 // 5MB + DefaultMaxFileSize = 10 * 1024 * 1024 // 10MB DefaultMaxPointsPerBlock = 1000 @@ -80,6 +77,8 @@ type Engine struct { FieldCodec(measurementName string) *tsdb.FieldCodec } + WAL *Log + filesLock sync.RWMutex files dataFiles currentFileID int @@ -88,11 +87,19 @@ type Engine struct { // NewEngine returns a new instance of Engine. func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { + w := NewLog(path) + w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval) + w.MemorySizeThreshold = int(opt.Config.WALPartitionSizeThreshold) + w.LoggingEnabled = opt.Config.WALLoggingEnabled + e := &Engine{ path: path, + // TODO: this is the function where we can inject a check against the in memory collisions HashSeriesField: hashSeriesField, + WAL: w, } + e.WAL.Index = e return e } @@ -116,6 +123,13 @@ func (e *Engine) Open() error { return err } for _, fn := range files { + id, err := idFromFileName(fn) + if err != nil { + return err + } + if id >= e.currentFileID { + e.currentFileID = id + 1 + } f, err := os.OpenFile(fn, os.O_RDONLY, 0666) if err != nil { return fmt.Errorf("error opening file %s: %s", fn, err.Error()) @@ -128,6 +142,10 @@ func (e *Engine) Open() error { } sort.Sort(e.files) + if err := e.WAL.Open(); err != nil { + return err + } + return nil } @@ -189,12 +207,10 @@ func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, // WritePoints writes metadata and point data into the engine. // Returns an error if new points are added to an existing key. func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { - // TODO: Write points to the WAL - - return e.WriteAndCompact(points, measurementFieldsToSave, seriesToCreate) + return e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate) } -func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { +func (e *Engine) WriteAndCompact(pointsByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { e.mu.Lock() defer e.mu.Unlock() @@ -205,31 +221,70 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma return err } - if len(points) == 0 { + if len(pointsByKey) == 0 { return nil } - b, err := e.readCompressedFile("names") + // read in keys and assign any that aren't defined + b, err := e.readCompressedFile("ids") if err != nil { return err } - ids := make(map[uint64]string) - - var names []string + ids := make(map[string]uint64) if b != nil { - if err := json.Unmarshal(b, &names); err != nil { + if err := json.Unmarshal(b, &ids); err != nil { return err } - - for _, n := range names { - ids[e.HashSeriesField(n)] = n - } } // these are values that are newer than anything stored in the shard - valuesByID := make(map[uint64]*valueCollection) - // map the points to the data file they belong to if they overlap - dataFileToValues := make(map[*dataFile]map[uint64]*valueCollection) + valuesByID := make(map[uint64]Values) + + idToKey := make(map[uint64]string) // we only use this map if new ids are being created + newKeys := false + for k, values := range pointsByKey { + var id uint64 + var ok bool + if id, ok = ids[k]; !ok { + // populate the map if we haven't already + if len(idToKey) == 0 { + for n, id := range ids { + idToKey[id] = n + } + } + + // now see if the hash id collides with a different key + hashID := hashSeriesField(k) + existingKey, idInMap := idToKey[hashID] + if idInMap { + // we only care if the keys are different. if so, it's a hash collision we have to keep track of + if k != existingKey { + // we have a collision, give this new key a different id and move on + // TODO: handle collisions + panic("name collision, not implemented yet!") + } + } else { + newKeys = true + ids[k] = hashID + idToKey[id] = k + id = hashID + } + } + + valuesByID[id] = values + } + + if newKeys { + b, err := json.Marshal(ids) + if err != nil { + return err + } + if err := e.replaceCompressedFile("ids", b); err != nil { + return err + } + } + + // TODO: handle values written in the past that force an old data file to get rewritten // we keep track of the newest data file and if it should be // rewritten with new data. @@ -240,87 +295,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma overwriteNewestFile = newestDataFile.size < DefaultMaxFileSize } - // compute ids of new keys and arrange for insertion - for _, p := range points { - for fn, val := range p.Fields() { - n := seriesFieldKey(string(p.Key()), fn) - id := e.HashSeriesField(n) - if series, ok := ids[id]; !ok { - names = append(names, n) - } else { // possible collision? - if n != series { - // TODO: implement collision detection - panic("name collision!") - } - } - - ids[id] = n - - vals := valuesByID[id] - if vals == nil { - // TODO: deal with situation where there are already files, - // but the user is inserting a bunch of data that predates - // any of them. It's ok to rewrite the first file, but - // only to max size. Then we should create a new one - - // points always come in time increasing order. This is - // the first point we've seen for this key. So it might - // need to get put into an older file instead of a new - // one. Check and set accordingly - var df *dataFile - for i := len(e.files) - 1; i >= 0; i-- { - if p.UnixNano() > e.files[i].MaxTime() { - break - } - df = e.files[i] - } - vals = &valueCollection{} - - if df == nil || (df == newestDataFile && overwriteNewestFile) { - // this point is newer than anything we have stored - // or it belongs in the most recent file, which should get - // rewritten - valuesByID[id] = vals - } else { - // it overlaps with another file so mark it and it can be compacted - dfm := dataFileToValues[df] - if dfm == nil { - dfm = make(map[uint64]*valueCollection) - dataFileToValues[df] = dfm - } - - if vc := dfm[id]; vc == nil { - dfm[id] = vals - } else { - vals = vc - } - } - } - - switch t := val.(type) { - case float64: - vals.floatValues = append(vals.floatValues, FloatValue{Time: p.Time(), Value: t}) - case int64: - vals.intValues = append(vals.intValues, Int64Value{Time: p.Time(), Value: t}) - case bool: - vals.boolValues = append(vals.boolValues, BoolValue{Time: p.Time(), Value: t}) - case string: - vals.stringValues = append(vals.stringValues, StringValue{Time: p.Time(), Value: t}) - default: - panic("unsupported type") - } - } - } - - b, err = json.Marshal(names) - if err != nil { - return err - } - - if err := e.replaceCompressedFile("names", b); err != nil { - return err - } - // flush values by id to either a new file or rewrite the old one if overwriteNewestFile { if err := e.rewriteFile(newestDataFile, valuesByID); err != nil { @@ -330,21 +304,14 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma return err } - // flush each of the old ones - for df, vals := range dataFileToValues { - if err := e.rewriteFile(df, vals); err != nil { - return err - } - } - return nil } -func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection) error { +func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error { // we need the values in sorted order so that we can merge them into the // new file as we read the old file - ids := make([]uint64, 0, len(values)) - for id, _ := range values { + ids := make([]uint64, 0, len(valuesByID)) + for id, _ := range valuesByID { ids = append(ids, id) } @@ -358,18 +325,18 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection minTime = oldDF.MinTime() maxTime = oldDF.MaxTime() } - for _, v := range values { - if minTime > v.MinTime().UnixNano() { - minTime = v.MinTime().UnixNano() + for _, v := range valuesByID { + if minTime > v.MinTime() { + minTime = v.MinTime() } - if maxTime < v.MaxTime().UnixNano() { - maxTime = v.MaxTime().UnixNano() + if maxTime < v.MaxTime() { + maxTime = v.MaxTime() } } // add any ids that are in the file that aren't getting flushed here for id, _ := range oldIDToPosition { - if _, ok := values[id]; !ok { + if _, ok := valuesByID[id]; !ok { ids = append(ids, id) } } @@ -414,10 +381,10 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection // mark the position for this ID newPositions[i] = currentPosition - newVals := values[id] + newVals := valuesByID[id] // if this id is only in the file and not in the new values, just copy over from old file - if newVals == nil { + if len(newVals) == 0 { fpos := oldIDToPosition[id] // write the blocks until we hit whatever the next id is @@ -482,7 +449,8 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection } } - newBlock, err := newVals.DecodeAndCombine(block, buf[:0], nextTime, hasFutureBlock) + nv, newBlock, err := e.DecodeAndCombine(newVals, block, buf[:0], nextTime, hasFutureBlock) + newVals = nv if err != nil { return err } @@ -503,7 +471,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection } // TODO: ensure we encode only the amount in a block, refactor this wil line 450 into func - if len(newVals.floatValues) > 0 { + if len(newVals) > 0 { // TODO: ensure we encode only the amount in a block block := newVals.Encode(buf) if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil { @@ -820,96 +788,39 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) { return series, nil } -type valueCollection struct { - floatValues []FloatValue - boolValues []BoolValue - intValues []Int64Value - stringValues []StringValue -} - -func (v *valueCollection) MinTime() time.Time { - if v.floatValues != nil { - return v.floatValues[0].Time - } else if v.boolValues != nil { - return v.boolValues[0].Time - } else if v.intValues != nil { - return v.intValues[0].Time - } else if v.stringValues != nil { - return v.stringValues[0].Time - } - - return time.Unix(0, 0) -} - -func (v *valueCollection) MaxTime() time.Time { - if v.floatValues != nil { - return v.floatValues[len(v.floatValues)-1].Time - } else if v.boolValues != nil { - return v.boolValues[len(v.boolValues)-1].Time - } else if v.intValues != nil { - return v.intValues[len(v.intValues)-1].Time - } else if v.stringValues != nil { - return v.stringValues[len(v.stringValues)-1].Time - } - - return time.Unix(0, 0) -} - -func (v *valueCollection) Encode(buf []byte) []byte { - if v.floatValues != nil { - return EncodeFloatBlock(buf, v.floatValues) - } else if v.boolValues != nil { - return EncodeBoolBlock(buf, v.boolValues) - } else if v.intValues != nil { - return EncodeInt64Block(buf, v.intValues) - } else if v.stringValues != nil { - return EncodeStringBlock(buf, v.stringValues) - } - - return nil -} - // DecodeAndCombine take an encoded block from a file, decodes it and interleaves the file -// values with the values in this collection. nextTime and hasNext refer to if the file +// values with the values passed in. nextTime and hasNext refer to if the file // has future encoded blocks so that this method can know how much of its values can be // combined and output in the resulting encoded block. -func (v *valueCollection) DecodeAndCombine(block, buf []byte, nextTime int64, hasFutureBlock bool) ([]byte, error) { - if v.floatValues != nil { - values, err := DecodeFloatBlock(block) - if err != nil { - return nil, err - } +func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) { + values := newValues.DecodeSameTypeBlock(block) - if hasFutureBlock { - // take all values that have times less than the future block and update the vals array - pos := sort.Search(len(v.floatValues), func(i int) bool { - return v.floatValues[i].Time.UnixNano() >= nextTime - }) - values = append(values, v.floatValues[:pos]...) - v.floatValues = v.floatValues[pos:] - } else { - values = append(values, v.floatValues...) - v.floatValues = nil - } - sort.Sort(FloatValues(values)) - // TODO: deduplicate values + var remainingValues Values - if len(values) > DefaultMaxPointsPerBlock { - v.floatValues = values[DefaultMaxPointsPerBlock:] - values = values[:DefaultMaxPointsPerBlock] + if hasFutureBlock { + // take all values that have times less than the future block and update the vals array + pos := sort.Search(len(newValues), func(i int) bool { + return newValues[i].Time().UnixNano() >= nextTime + }) + values = append(values, newValues[:pos]...) + remainingValues = newValues[pos:] + sort.Sort(values) + } else { + requireSort := values.MaxTime() > newValues.MinTime() + values = append(values, newValues...) + if requireSort { + sort.Sort(values) } - - return EncodeFloatBlock(buf, values), nil - } else if v.boolValues != nil { - // TODO: wire up the other value types - return nil, fmt.Errorf("not implemented") - } else if v.intValues != nil { - return nil, fmt.Errorf("not implemented") - } else if v.stringValues != nil { - return nil, fmt.Errorf("not implemented") } - return nil, nil + // TODO: deduplicate values + + if len(values) > DefaultMaxPointsPerBlock { + remainingValues = values[DefaultMaxPointsPerBlock:] + values = values[:DefaultMaxPointsPerBlock] + } + + return remainingValues, values.Encode(buf), nil } type dataFile struct { @@ -1040,7 +951,7 @@ type cursor struct { f *dataFile filesPos int // the index in the files slice we're looking at pos uint32 - vals FloatValues + vals Values direction tsdb.Direction @@ -1121,7 +1032,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) { // wasn't in the first value popped out of the block, check the rest for i, v := range c.vals { - if v.Time.UnixNano() >= t { + if v.Time().UnixNano() >= t { c.vals = c.vals[i+1:] return v.TimeBytes(), v.ValueBytes() } @@ -1220,180 +1131,3 @@ type uint64slice []uint64 func (a uint64slice) Len() int { return len(a) } func (a uint64slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint64slice) Less(i, j int) bool { return a[i] < a[j] } - -/* TODO: REMOVE THIS STUFF */ -func (e *Engine) pointsToBlocks(points [][]byte) []byte { - var b bytes.Buffer - block := make([]byte, 0) - for _, p := range points { - block = append(block, p[0:8]...) - block = append(block, u32tob(uint32(len(p)-8))...) - block = append(block, p[8:]...) - if len(block) > DefaultBlockSize { - e.writeBlockToBuffer(block, &b) - block = make([]byte, 0) - } - } - if len(block) > 0 { - e.writeBlockToBuffer(block, &b) - } - - return b.Bytes() -} - -func (e *Engine) writeBlockToBuffer(block []byte, b *bytes.Buffer) { - // write the min time - if _, err := b.Write(block[0:8]); err != nil { - panic(err) - } - - // write the length of the compressed data - data := snappy.Encode(nil, block) - if _, err := b.Write(u32tob(uint32(len(data)))); err != nil { - panic(err) - } - - // write the compressed data - if _, err := b.Write(data); err != nil { - panic(err) - } -} - -func (e *Engine) readPointsFromFile(f *os.File) (map[uint64][][]byte, error) { - buf := make([]byte, 8) - if _, err := io.ReadFull(f, buf); err != nil { - return nil, err - } - seriesCount := btou64(buf) - positions := make([]uint64, seriesCount, seriesCount) - ids := make([]uint64, seriesCount, seriesCount) - - // read the series index file header - position := uint64(8) - for i := 0; uint64(i) < seriesCount; i++ { - // read the id of the series - if _, err := io.ReadFull(f, buf); err != nil { - return nil, err - } - ids[i] = btou64(buf) - - // read the min time and ignore - if _, err := io.ReadFull(f, buf); err != nil { - return nil, err - } - if _, err := io.ReadFull(f, buf); err != nil { - return nil, err - } - - // read the starting position of this id - if _, err := io.ReadFull(f, buf); err != nil { - return nil, err - } - positions[i] = btou64(buf) - position += 32 - } - - if position != positions[0] { - panic("we aren't at the right place") - } - - // read the raw data - seriesData := make(map[uint64][][]byte) - compressedBuff := make([]byte, DefaultBlockSize) - seriesPosition := 0 - for { - // read the min time and ignore - if _, err := io.ReadFull(f, buf); err == io.EOF { - break - } else if err != nil { - return nil, err - } - - // read the length of the compressed block - if _, err := io.ReadFull(f, buf[:4]); err != nil { - return nil, err - } - length := btou32(buf) - - if length > uint32(len(compressedBuff)) { - compressedBuff = make([]byte, length) - } - if _, err := io.ReadFull(f, compressedBuff[:length]); err != nil { - return nil, err - } - - data, err := snappy.Decode(nil, compressedBuff[:length]) - if err != nil { - return nil, err - } - id := ids[seriesPosition] - seriesData[id] = append(seriesData[id], e.pointsFromDataBlock(data)...) - position += uint64(12 + length) - - if seriesPosition+1 >= len(positions) { - continue - } - if positions[seriesPosition+1] == position { - seriesPosition += 1 - } - } - - return seriesData, nil -} - -func (e *Engine) pointsFromDataBlock(data []byte) [][]byte { - a := make([][]byte, 0) - for { - length := entryDataSize(data) - p := append(data[:8], data[12:12+length]...) - a = append(a, p) - data = data[12+length:] - if len(data) == 0 { - break - } - } - return a -} - -func entryDataSize(v []byte) int { return int(binary.BigEndian.Uint32(v[8:12])) } - -func (e *Engine) lastFileAndNewFile() (*os.File, *os.File, error) { - files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", Format))) - if err != nil { - return nil, nil, err - } - - if len(files) == 0 { - newFile, err := os.OpenFile(filepath.Join(e.path, fmt.Sprintf("%07d.%s", 1, Format)), os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - return nil, nil, err - } - return nil, newFile, nil - } - - oldFile, err := os.OpenFile(files[len(files)-1], os.O_RDONLY, 0666) - if err != nil { - return nil, nil, err - } - - info, err := oldFile.Stat() - if err != nil { - _ = oldFile.Close() - return nil, nil, err - } - - num := strings.Split(filepath.Base(files[len(files)-1]), ".")[0] - n, err := strconv.ParseUint(num, 10, 32) - if err != nil { - return nil, nil, err - } - newFile, err := os.OpenFile(filepath.Join(e.path, fmt.Sprintf("%07d.%s", n+1, Format)), os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - return nil, nil, err - } - if info.Size() >= DefaultMaxFileSize { - oldFile.Close() - return nil, newFile, nil - } - return oldFile, newFile, nil -} diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index 7d915aab69..79817eb531 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -25,7 +25,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { p3 := parsePoint("cpu,host=A value=2.1 2000000000") p4 := parsePoint("cpu,host=B value=2.2 2000000000") - if err := e.WriteAndCompact([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil { + if err := e.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } @@ -68,7 +68,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) { } verify(true) - if err := e.WriteAndCompact([]tsdb.Point{p4}, nil, nil); err != nil { + if err := e.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } verify(false) @@ -123,13 +123,13 @@ func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { } st := time.Now() - if err := e.WriteAndCompact(points, nil, nil); err != nil { + if err := e.WritePoints(points, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } fmt.Println("took: ", time.Since(st)) st = time.Now() - if err := e.WriteAndCompact(points, nil, nil); err != nil { + if err := e.WritePoints(points, nil, nil); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } fmt.Println("took: ", time.Since(st)) @@ -161,6 +161,7 @@ func OpenEngine(opt tsdb.EngineOptions) *Engine { if err := e.Open(); err != nil { panic(err) } + e.WAL.SkipCache = true return e } diff --git a/tsdb/engine/pd1/wal.go b/tsdb/engine/pd1/wal.go new file mode 100644 index 0000000000..26f2af48ff --- /dev/null +++ b/tsdb/engine/pd1/wal.go @@ -0,0 +1,581 @@ +package pd1 + +import ( + "bytes" + "fmt" + "io" + "log" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/snappy" + "github.com/influxdb/influxdb/tsdb" +) + +const ( + // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over + DefaultSegmentSize = 2 * 1024 * 1024 + + // FileExtension is the file extension we expect for wal segments + WALFileExtension = "wal" + + WALFilePrefix = "_" + + // defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria + defaultFlushCheckInterval = time.Second +) + +// flushType indiciates why a flush and compaction are being run so the partition can +// do the appropriate type of compaction +type flushType int + +const ( + // noFlush indicates that no flush or compaction are necesssary at this time + noFlush flushType = iota + // memoryFlush indicates that we should look for the series using the most + // memory to flush out and compact all others + memoryFlush + // idleFlush indicates that we should flush all series in the parition, + // delete all segment files and hold off on opening a new one + idleFlush + // deleteFlush indicates that we're flushing because series need to be removed from the WAL + deleteFlush + + writeBufLen = 32 << 10 // 32kb +) + +// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains +type walEntryType byte + +const ( + pointsEntry walEntryType = 0x01 + fieldsEntry walEntryType = 0x02 + seriesEntry walEntryType = 0x03 +) + +type Log struct { + path string + + flushCheckTimer *time.Timer // check this often to see if a background flush should happen + flushCheckInterval time.Duration + + // write variables + writeLock sync.Mutex + currentSegmentID int + currentSegmentFile *os.File + currentSegmentSize int + lastWriteTime time.Time + flushRunning bool + + // cache variables + cacheLock sync.RWMutex + cache map[string]Values + cacheDirtySort map[string]bool // this map should be small, only for dirty vals + flushCache map[string]Values // temporary map while flushing + memorySize int + measurementFieldsCache map[string]*tsdb.MeasurementFields + seriesToCreateCache []*tsdb.SeriesCreate + + // These coordinate closing and waiting for running goroutines. + wg sync.WaitGroup + closing chan struct{} + + // LogOutput is the writer used by the logger. + LogOutput io.Writer + logger *log.Logger + + // FlushColdInterval is the period of time after which a partition will do a + // full flush and compaction if it has been cold for writes. + FlushColdInterval time.Duration + + // SegmentSize is the file size at which a segment file will be rotated + SegmentSize int + + // MemorySizeThreshold specifies when the log should be forced to be flushed. + MemorySizeThreshold int + + // Index is the database series will be flushed to + Index IndexWriter + + // LoggingEnabled specifies if detailed logs should be output + LoggingEnabled bool + + // SkipCache specifies if the wal should immediately write to the index instead of + // caching data in memory. False by default so we buffer in memory before flushing to index. + SkipCache bool + + // SkipDurability specifies if the wal should not write the wal entries to disk. + // False by default which means all writes are durable even when cached before flushing to index. + SkipDurability bool +} + +// IndexWriter is an interface for the indexed database the WAL flushes data to +type IndexWriter interface { + WriteAndCompact(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error +} + +func NewLog(path string) *Log { + return &Log{ + path: path, + + // these options should be overriden by any options in the config + LogOutput: os.Stderr, + FlushColdInterval: tsdb.DefaultFlushColdInterval, + SegmentSize: DefaultSegmentSize, + MemorySizeThreshold: tsdb.DefaultPartitionSizeThreshold, + flushCheckInterval: defaultFlushCheckInterval, + logger: log.New(os.Stderr, "[pwl] ", log.LstdFlags), + } +} + +// Open opens and initializes the Log. Will recover from previous unclosed shutdowns +func (l *Log) Open() error { + + if l.LoggingEnabled { + l.logger.Printf("PD1 WAL starting with %d memory size threshold\n", l.MemorySizeThreshold) + l.logger.Printf("WAL writing to %s\n", l.path) + } + if err := os.MkdirAll(l.path, 0777); err != nil { + return err + } + + l.cache = make(map[string]Values) + l.cacheDirtySort = make(map[string]bool) + l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields) + // TODO: read segment files and flush them all to disk + + l.flushCheckTimer = time.NewTimer(l.flushCheckInterval) + + // Start background goroutines. + l.wg.Add(1) + l.closing = make(chan struct{}) + go l.autoflusher(l.closing) + + return nil +} + +// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given +func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { + l.cacheLock.RLock() + defer l.cacheLock.RUnlock() + + // TODO: make this work for other fields + ck := seriesFieldKey(key, "value") + values := l.cache[ck] + + // if we're in the middle of a flush, combine the previous cache + // with this one for the cursor + if l.flushCache != nil { + if fc, ok := l.flushCache[ck]; ok { + c := make([]Value, len(fc), len(fc)+len(values)) + copy(c, fc) + c = append(c, values...) + + return newWALCursor(c, direction) + } + } + + if l.cacheDirtySort[ck] { + sort.Sort(values) + delete(l.cacheDirtySort, ck) + } + + // build a copy so writes afterwards don't change the result set + a := make([]Value, len(values)) + copy(a, values) + return newWALCursor(a, direction) +} + +func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { + // make the write durable if specified + if !l.SkipDurability { + pointStrings := make([]string, len(points)) + for i, p := range points { + pointStrings[i] = p.String() + } + data := strings.Join(pointStrings, "\n") + compressed := snappy.Encode(nil, []byte(data)) + + if err := l.writeToLog(pointsEntry, compressed); err != nil { + return err + } + + // TODO: write the fields + + // TODO: write the series + } + + // convert to values that can be either cached in memory or flushed to the index + l.cacheLock.Lock() + for _, p := range points { + for name, value := range p.Fields() { + k := seriesFieldKey(string(p.Key()), name) + v := NewValue(p.Time(), value) + cacheValues := l.cache[k] + + // only mark it as dirty if it isn't already + if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 { + dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() > v.Time().UnixNano() + if dirty { + l.cacheDirtySort[k] = true + } + } + l.memorySize += v.Size() + l.cache[k] = append(cacheValues, v) + } + } + + for k, v := range fields { + l.measurementFieldsCache[k] = v + } + l.seriesToCreateCache = append(l.seriesToCreateCache, series...) + l.lastWriteTime = time.Now() + l.cacheLock.Unlock() + + // usually skipping the cache is only for testing purposes and this was the easiest + // way to represent the logic (to cache and then immediately flush) + if l.SkipCache { + l.flush(idleFlush) + } + + return nil +} + +func (l *Log) writeToLog(writeType walEntryType, data []byte) error { + l.writeLock.Lock() + defer l.writeLock.Unlock() + + if l.currentSegmentFile == nil { + l.newSegmentFile() + } + + if _, err := l.currentSegmentFile.Write([]byte{byte(writeType)}); err != nil { + panic(fmt.Sprintf("error writing type to wal: %s", err.Error())) + } + if _, err := l.currentSegmentFile.Write(u32tob(uint32(len(data)))); err != nil { + panic(fmt.Sprintf("error writing len to wal: %s", err.Error())) + } + if _, err := l.currentSegmentFile.Write(data); err != nil { + panic(fmt.Sprintf("error writing data to wal: %s", err.Error())) + } + + return l.currentSegmentFile.Sync() +} + +// Flush will force a flush of the WAL to the index +func (l *Log) Flush() error { + return l.flush(idleFlush) +} + +func (l *Log) DeleteSeries(keys []string) error { + panic("not implemented") +} + +// Close will finish any flush that is currently in process and close file handles +func (l *Log) Close() error { + // stop the autoflushing process so it doesn't try to kick another one off + l.writeLock.Lock() + l.cacheLock.Lock() + + if l.closing != nil { + close(l.closing) + l.closing = nil + } + l.writeLock.Unlock() + l.cacheLock.Unlock() + + // Allow goroutines to finish running. + l.wg.Wait() + + // Lock the remainder of the closing process. + l.writeLock.Lock() + l.cacheLock.Lock() + defer l.writeLock.Unlock() + defer l.cacheLock.Unlock() + + l.cache = nil + l.measurementFieldsCache = nil + l.seriesToCreateCache = nil + if l.currentSegmentFile == nil { + return nil + } + if err := l.currentSegmentFile.Close(); err != nil { + return err + } + l.currentSegmentFile = nil + + return nil +} + +// close all the open Log partitions and file handles +func (l *Log) close() error { + l.cache = nil + l.cacheDirtySort = nil + if l.currentSegmentFile == nil { + return nil + } + if err := l.currentSegmentFile.Close(); err != nil { + return err + } + l.currentSegmentFile = nil + + return nil +} + +// flush writes all wal data in memory to the index +func (l *Log) flush(flush flushType) error { + l.writeLock.Lock() + if l.flushRunning { + l.writeLock.Unlock() + return nil + } + + l.flushRunning = true + defer func() { + l.writeLock.Lock() + l.flushRunning = false + l.writeLock.Unlock() + }() + lastFileID := l.currentSegmentID + if err := l.newSegmentFile(); err != nil { + // there's no recovering from this, fail hard + panic(fmt.Sprintf("error creating new wal file: %s", err.Error())) + } + l.writeLock.Unlock() + + // copy the cache items to new maps so we can empty them out + l.cacheLock.Lock() + + // move over the flush cache and make a copy to write + l.flushCache = l.cache + l.cache = make(map[string]Values) + l.cacheDirtySort = make(map[string]bool) + valuesByKey := make(map[string]Values) + + valueCount := 0 + for key, v := range l.flushCache { + valuesByKey[key] = v + valueCount += len(v) + } + + if l.LoggingEnabled { + ftype := "idle" + if flush == memoryFlush { + ftype = "memory" + } + l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, l.memorySize) + } + + // reset the memory being used by the cache + l.memorySize = 0 + + // reset the measurements for flushing + mfc := l.measurementFieldsCache + l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields) + + // reset the series for flushing + scc := l.seriesToCreateCache + l.seriesToCreateCache = nil + + l.cacheLock.Unlock() + + startTime := time.Now() + if err := l.Index.WriteAndCompact(valuesByKey, mfc, scc); err != nil { + return err + } + if l.LoggingEnabled { + l.logger.Printf("flush to index took %s\n", time.Since(startTime)) + } + + l.cacheLock.Lock() + l.flushCache = nil + l.cacheLock.Unlock() + + // remove all the old segment files + fileNames, err := l.segmentFileNames() + if err != nil { + return err + } + for _, fn := range fileNames { + id, err := idFromFileName(fn) + if err != nil { + return err + } + if id <= lastFileID { + err := os.Remove(fn) + if err != nil { + return err + } + } + } + + return nil +} + +// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction +func (l *Log) triggerAutoFlush() { + if f := l.shouldFlush(); f != noFlush { + if err := l.flush(f); err != nil { + l.logger.Printf("error flushing wal: %s\n", err) + } + } +} + +// autoflusher waits for notification of a flush and kicks it off in the background. +// This method runs in a separate goroutine. +func (l *Log) autoflusher(closing chan struct{}) { + defer l.wg.Done() + + for { + // Wait for close or flush signal. + select { + case <-closing: + return + case <-l.flushCheckTimer.C: + l.triggerAutoFlush() + l.flushCheckTimer.Reset(l.flushCheckInterval) + } + } +} + +// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID +func (l *Log) segmentFileNames() ([]string, error) { + names, err := filepath.Glob(filepath.Join(l.path, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension))) + if err != nil { + return nil, err + } + sort.Strings(names) + return names, nil +} + +// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log +func (l *Log) newSegmentFile() error { + l.currentSegmentID += 1 + if l.currentSegmentFile != nil { + if err := l.currentSegmentFile.Close(); err != nil { + return err + } + } + + fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension)) + ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + l.currentSegmentSize = 0 + l.currentSegmentFile = ff + + return nil +} + +// shouldFlush +func (l *Log) shouldFlush() flushType { + l.cacheLock.RLock() + defer l.cacheLock.RUnlock() + + if len(l.cache) == 0 { + return noFlush + } + + if l.memorySize > l.MemorySizeThreshold { + return memoryFlush + } + + if time.Since(l.lastWriteTime) > l.FlushColdInterval { + return idleFlush + } + + return noFlush +} + +// cursor is a unidirectional iterator for a given entry in the cache +type walCursor struct { + cache Values + position int + direction tsdb.Direction +} + +func newWALCursor(cache Values, direction tsdb.Direction) *walCursor { + // position is set such that a call to Next will successfully advance + // to the next postion and return the value. + c := &walCursor{cache: cache, direction: direction, position: -1} + if direction.Reverse() { + c.position = len(c.cache) + } + return c +} + +func (c *walCursor) Direction() tsdb.Direction { return c.direction } + +// Seek will point the cursor to the given time (or key) +func (c *walCursor) Seek(seek []byte) (key, value []byte) { + // Seek cache index + c.position = sort.Search(len(c.cache), func(i int) bool { + return bytes.Compare(c.cache[i].TimeBytes(), seek) != -1 + }) + + // If seek is not in the cache, return the last value in the cache + if c.direction.Reverse() && c.position >= len(c.cache) { + c.position = len(c.cache) + } + + // Make sure our position points to something in the cache + if c.position < 0 || c.position >= len(c.cache) { + return nil, nil + } + + v := c.cache[c.position] + + return v.TimeBytes(), v.ValueBytes() +} + +// Next moves the cursor to the next key/value. will return nil if at the end +func (c *walCursor) Next() (key, value []byte) { + var v Value + if c.direction.Forward() { + v = c.nextForward() + } else { + v = c.nextReverse() + } + + return v.TimeBytes(), v.ValueBytes() +} + +// nextForward advances the cursor forward returning the next value +func (c *walCursor) nextForward() Value { + c.position++ + + if c.position >= len(c.cache) { + return &EmptyValue{} + } + + return c.cache[c.position] +} + +// nextReverse advances the cursor backwards returning the next value +func (c *walCursor) nextReverse() Value { + c.position-- + + if c.position < 0 { + return &EmptyValue{} + } + + return c.cache[c.position] +} + +// idFromFileName parses the segment file ID from its name +func idFromFileName(name string) (int, error) { + parts := strings.Split(filepath.Base(name), ".") + if len(parts) != 2 { + return 0, fmt.Errorf("file %s has wrong name format to have an id", name) + } + + id, err := strconv.ParseUint(parts[0][1:], 10, 32) + + return int(id), err +}