WIP: more WAL work

pull/4308/head
Paul Dix 2015-09-09 11:29:50 -07:00
parent 2ba032b7a8
commit 82e1be7527
7 changed files with 827 additions and 435 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"math/rand"
"net/url"
"runtime"
"sort"

View File

@ -42,7 +42,7 @@ const (
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB
)
type Config struct {

View File

@ -12,52 +12,127 @@ type Value interface {
TimeBytes() []byte
ValueBytes() []byte
Time() time.Time
Value() interface{}
Size() int
}
func NewValue(t time.Time, value interface{}) Value {
switch v := value.(type) {
// case int64:
// return &Int64Value{time: t, value: v}
case float64:
return &FloatValue{time: t, value: v}
// case bool:
// return &BoolValue{time: t, value: v}
// case string:
// return &StringValue{time: t, value: v}
}
return &EmptyValue{}
}
type EmptyValue struct {
}
func (e *EmptyValue) TimeBytes() []byte { return nil }
func (e *EmptyValue) ValueBytes() []byte { return nil }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, 0) }
func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 }
// Values represented a time ascending sorted collection of Value types.
// the underlying type should be the same across all values, but the interface
// makes the code cleaner.
type Values []Value
func (v Values) MinTime() int64 {
return v[0].Time().UnixNano()
}
func (v Values) MaxTime() int64 {
return v[len(v)-1].Time().UnixNano()
}
func (v Values) Encode(buf []byte) []byte {
switch v[0].(type) {
case *FloatValue:
a := make([]*FloatValue, len(v))
for i, vv := range v {
a[i] = vv.(*FloatValue)
}
return EncodeFloatBlock(buf, a)
// TODO: add support for other types
}
return nil
}
func (v Values) DecodeSameTypeBlock(block []byte) Values {
switch v[0].(type) {
case *FloatValue:
a, _ := DecodeFloatBlock(block)
return a
// TODO: add support for other types
}
return nil
}
// Sort methods
func (a Values) Len() int { return len(a) }
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Values) Less(i, j int) bool { return a[i].Time().UnixNano() < a[j].Time().UnixNano() }
type FloatValue struct {
Time time.Time
Value float64
time time.Time
value float64
}
func (f *FloatValue) Time() time.Time {
return f.time
}
func (f *FloatValue) Value() interface{} {
return f.value
}
func (f *FloatValue) TimeBytes() []byte {
return u64tob(uint64(f.Time.UnixNano()))
return u64tob(uint64(f.Time().UnixNano()))
}
func (f *FloatValue) ValueBytes() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, math.Float64bits(f.Value))
binary.BigEndian.PutUint64(buf, math.Float64bits(f.value))
return buf
}
type FloatValues []FloatValue
func (a FloatValues) Len() int { return len(a) }
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FloatValues) Less(i, j int) bool { return a[i].Time.UnixNano() < a[j].Time.UnixNano() }
// TODO: make this work with nanosecond timestamps
func EncodeFloatBlock(buf []byte, values []FloatValue) []byte {
s := tsz.New(uint32(values[0].Time.Unix()))
for _, v := range values {
s.Push(uint32(v.Time.Unix()), v.Value)
}
s.Finish()
return append(u64tob(uint64(values[0].Time.UnixNano())), s.Bytes()...)
func (f *FloatValue) Size() int {
return 16
}
func DecodeFloatBlock(block []byte) ([]FloatValue, error) {
// TODO: make this work with nanosecond timestamps
func EncodeFloatBlock(buf []byte, values []*FloatValue) []byte {
s := tsz.New(uint32(values[0].Time().Unix()))
for _, v := range values {
s.Push(uint32(v.Time().Unix()), v.value)
}
s.Finish()
return append(u64tob(uint64(values[0].Time().UnixNano())), s.Bytes()...)
}
func DecodeFloatBlock(block []byte) ([]Value, error) {
iter, _ := tsz.NewIterator(block[8:])
a := make([]FloatValue, 0)
a := make([]Value, 0)
for iter.Next() {
t, f := iter.Values()
a = append(a, FloatValue{time.Unix(int64(t), 0), f})
a = append(a, &FloatValue{time.Unix(int64(t), 0), f})
}
return a, nil
}
type BoolValue struct {
Time time.Time
Value bool
time time.Time
value bool
}
func EncodeBoolBlock(buf []byte, values []BoolValue) []byte {
@ -69,8 +144,8 @@ func DecodeBoolBlock(block []byte) ([]BoolValue, error) {
}
type Int64Value struct {
Time time.Time
Value int64
time time.Time
value int64
}
func EncodeInt64Block(buf []byte, values []Int64Value) []byte {
@ -82,8 +157,8 @@ func DecodeInt64Block(block []byte) ([]Int64Value, error) {
}
type StringValue struct {
Time time.Time
Value string
time time.Time
value string
}
func EncodeStringBlock(buf []byte, values []StringValue) []byte {

View File

@ -1,32 +1,32 @@
package pd1_test
import (
"math/rand"
"reflect"
// "math/rand"
// "reflect"
"testing"
"time"
"github.com/influxdb/influxdb/tsdb/engine/pd1"
// "github.com/influxdb/influxdb/tsdb/engine/pd1"
)
func TestEncoding_FloatBlock(t *testing.T) {
valueCount := 100
times := getTimes(valueCount, 60, time.Second)
values := make([]pd1.FloatValue, len(times))
for i, t := range times {
values[i] = pd1.FloatValue{Time: t, Value: rand.Float64()}
}
// valueCount := 100
// times := getTimes(valueCount, 60, time.Second)
// values := make([]Value, len(times))
// for i, t := range times {
// values[i] = pd1.NewValue(t, rand.Float64())
// }
b := pd1.EncodeFloatBlock(nil, values)
// b := pd1.EncodeFloatBlock(nil, values)
decodedValues, err := pd1.DecodeFloatBlock(b)
if err != nil {
t.Fatalf("error decoding: %s", err.Error)
}
// decodedValues, err := pd1.DecodeFloatBlock(b)
// if err != nil {
// t.Fatalf("error decoding: %s", err.Error)
// }
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
// if !reflect.DeepEqual(decodedValues, values) {
// t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
// }
}
func getTimes(n, step int, precision time.Duration) []time.Time {

View File

@ -1,7 +1,6 @@
package pd1
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
@ -12,8 +11,6 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
@ -29,13 +26,13 @@ const (
// FieldsFileExtension is the extension for the file that stores compressed field
// encoding data for this db
FieldsFileExtension = "fld"
FieldsFileExtension = "fields"
// SeriesFileExtension is the extension for the file that stores the compressed
// series metadata for series in this db
SeriesFileExtension = "srs"
SeriesFileExtension = "series"
CollisionsFileExtension = "col"
CollisionsFileExtension = "collisions"
)
type TimePrecision uint8
@ -55,7 +52,7 @@ const (
// DefaultBlockSize is the default size of uncompressed points blocks.
DefaultBlockSize = 512 * 1024 // 512KB
DefaultMaxFileSize = 5 * 1024 * 1024 // 5MB
DefaultMaxFileSize = 10 * 1024 * 1024 // 10MB
DefaultMaxPointsPerBlock = 1000
@ -80,6 +77,8 @@ type Engine struct {
FieldCodec(measurementName string) *tsdb.FieldCodec
}
WAL *Log
filesLock sync.RWMutex
files dataFiles
currentFileID int
@ -88,11 +87,19 @@ type Engine struct {
// NewEngine returns a new instance of Engine.
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewLog(path)
w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval)
w.MemorySizeThreshold = int(opt.Config.WALPartitionSizeThreshold)
w.LoggingEnabled = opt.Config.WALLoggingEnabled
e := &Engine{
path: path,
// TODO: this is the function where we can inject a check against the in memory collisions
HashSeriesField: hashSeriesField,
WAL: w,
}
e.WAL.Index = e
return e
}
@ -116,6 +123,13 @@ func (e *Engine) Open() error {
return err
}
for _, fn := range files {
id, err := idFromFileName(fn)
if err != nil {
return err
}
if id >= e.currentFileID {
e.currentFileID = id + 1
}
f, err := os.OpenFile(fn, os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("error opening file %s: %s", fn, err.Error())
@ -128,6 +142,10 @@ func (e *Engine) Open() error {
}
sort.Sort(e.files)
if err := e.WAL.Open(); err != nil {
return err
}
return nil
}
@ -189,12 +207,10 @@ func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex,
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
// TODO: Write points to the WAL
return e.WriteAndCompact(points, measurementFieldsToSave, seriesToCreate)
return e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate)
}
func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
func (e *Engine) WriteAndCompact(pointsByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
e.mu.Lock()
defer e.mu.Unlock()
@ -205,31 +221,70 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
return err
}
if len(points) == 0 {
if len(pointsByKey) == 0 {
return nil
}
b, err := e.readCompressedFile("names")
// read in keys and assign any that aren't defined
b, err := e.readCompressedFile("ids")
if err != nil {
return err
}
ids := make(map[uint64]string)
var names []string
ids := make(map[string]uint64)
if b != nil {
if err := json.Unmarshal(b, &names); err != nil {
if err := json.Unmarshal(b, &ids); err != nil {
return err
}
for _, n := range names {
ids[e.HashSeriesField(n)] = n
}
}
// these are values that are newer than anything stored in the shard
valuesByID := make(map[uint64]*valueCollection)
// map the points to the data file they belong to if they overlap
dataFileToValues := make(map[*dataFile]map[uint64]*valueCollection)
valuesByID := make(map[uint64]Values)
idToKey := make(map[uint64]string) // we only use this map if new ids are being created
newKeys := false
for k, values := range pointsByKey {
var id uint64
var ok bool
if id, ok = ids[k]; !ok {
// populate the map if we haven't already
if len(idToKey) == 0 {
for n, id := range ids {
idToKey[id] = n
}
}
// now see if the hash id collides with a different key
hashID := hashSeriesField(k)
existingKey, idInMap := idToKey[hashID]
if idInMap {
// we only care if the keys are different. if so, it's a hash collision we have to keep track of
if k != existingKey {
// we have a collision, give this new key a different id and move on
// TODO: handle collisions
panic("name collision, not implemented yet!")
}
} else {
newKeys = true
ids[k] = hashID
idToKey[id] = k
id = hashID
}
}
valuesByID[id] = values
}
if newKeys {
b, err := json.Marshal(ids)
if err != nil {
return err
}
if err := e.replaceCompressedFile("ids", b); err != nil {
return err
}
}
// TODO: handle values written in the past that force an old data file to get rewritten
// we keep track of the newest data file and if it should be
// rewritten with new data.
@ -240,87 +295,6 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
overwriteNewestFile = newestDataFile.size < DefaultMaxFileSize
}
// compute ids of new keys and arrange for insertion
for _, p := range points {
for fn, val := range p.Fields() {
n := seriesFieldKey(string(p.Key()), fn)
id := e.HashSeriesField(n)
if series, ok := ids[id]; !ok {
names = append(names, n)
} else { // possible collision?
if n != series {
// TODO: implement collision detection
panic("name collision!")
}
}
ids[id] = n
vals := valuesByID[id]
if vals == nil {
// TODO: deal with situation where there are already files,
// but the user is inserting a bunch of data that predates
// any of them. It's ok to rewrite the first file, but
// only to max size. Then we should create a new one
// points always come in time increasing order. This is
// the first point we've seen for this key. So it might
// need to get put into an older file instead of a new
// one. Check and set accordingly
var df *dataFile
for i := len(e.files) - 1; i >= 0; i-- {
if p.UnixNano() > e.files[i].MaxTime() {
break
}
df = e.files[i]
}
vals = &valueCollection{}
if df == nil || (df == newestDataFile && overwriteNewestFile) {
// this point is newer than anything we have stored
// or it belongs in the most recent file, which should get
// rewritten
valuesByID[id] = vals
} else {
// it overlaps with another file so mark it and it can be compacted
dfm := dataFileToValues[df]
if dfm == nil {
dfm = make(map[uint64]*valueCollection)
dataFileToValues[df] = dfm
}
if vc := dfm[id]; vc == nil {
dfm[id] = vals
} else {
vals = vc
}
}
}
switch t := val.(type) {
case float64:
vals.floatValues = append(vals.floatValues, FloatValue{Time: p.Time(), Value: t})
case int64:
vals.intValues = append(vals.intValues, Int64Value{Time: p.Time(), Value: t})
case bool:
vals.boolValues = append(vals.boolValues, BoolValue{Time: p.Time(), Value: t})
case string:
vals.stringValues = append(vals.stringValues, StringValue{Time: p.Time(), Value: t})
default:
panic("unsupported type")
}
}
}
b, err = json.Marshal(names)
if err != nil {
return err
}
if err := e.replaceCompressedFile("names", b); err != nil {
return err
}
// flush values by id to either a new file or rewrite the old one
if overwriteNewestFile {
if err := e.rewriteFile(newestDataFile, valuesByID); err != nil {
@ -330,21 +304,14 @@ func (e *Engine) WriteAndCompact(points []tsdb.Point, measurementFieldsToSave ma
return err
}
// flush each of the old ones
for df, vals := range dataFileToValues {
if err := e.rewriteFile(df, vals); err != nil {
return err
}
}
return nil
}
func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection) error {
func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error {
// we need the values in sorted order so that we can merge them into the
// new file as we read the old file
ids := make([]uint64, 0, len(values))
for id, _ := range values {
ids := make([]uint64, 0, len(valuesByID))
for id, _ := range valuesByID {
ids = append(ids, id)
}
@ -358,18 +325,18 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
minTime = oldDF.MinTime()
maxTime = oldDF.MaxTime()
}
for _, v := range values {
if minTime > v.MinTime().UnixNano() {
minTime = v.MinTime().UnixNano()
for _, v := range valuesByID {
if minTime > v.MinTime() {
minTime = v.MinTime()
}
if maxTime < v.MaxTime().UnixNano() {
maxTime = v.MaxTime().UnixNano()
if maxTime < v.MaxTime() {
maxTime = v.MaxTime()
}
}
// add any ids that are in the file that aren't getting flushed here
for id, _ := range oldIDToPosition {
if _, ok := values[id]; !ok {
if _, ok := valuesByID[id]; !ok {
ids = append(ids, id)
}
}
@ -414,10 +381,10 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
// mark the position for this ID
newPositions[i] = currentPosition
newVals := values[id]
newVals := valuesByID[id]
// if this id is only in the file and not in the new values, just copy over from old file
if newVals == nil {
if len(newVals) == 0 {
fpos := oldIDToPosition[id]
// write the blocks until we hit whatever the next id is
@ -482,7 +449,8 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
}
}
newBlock, err := newVals.DecodeAndCombine(block, buf[:0], nextTime, hasFutureBlock)
nv, newBlock, err := e.DecodeAndCombine(newVals, block, buf[:0], nextTime, hasFutureBlock)
newVals = nv
if err != nil {
return err
}
@ -503,7 +471,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, values map[uint64]*valueCollection
}
// TODO: ensure we encode only the amount in a block, refactor this wil line 450 into func
if len(newVals.floatValues) > 0 {
if len(newVals) > 0 {
// TODO: ensure we encode only the amount in a block
block := newVals.Encode(buf)
if _, err := f.Write(append(u64tob(id), u32tob(uint32(len(block)))...)); err != nil {
@ -820,96 +788,39 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
return series, nil
}
type valueCollection struct {
floatValues []FloatValue
boolValues []BoolValue
intValues []Int64Value
stringValues []StringValue
}
func (v *valueCollection) MinTime() time.Time {
if v.floatValues != nil {
return v.floatValues[0].Time
} else if v.boolValues != nil {
return v.boolValues[0].Time
} else if v.intValues != nil {
return v.intValues[0].Time
} else if v.stringValues != nil {
return v.stringValues[0].Time
}
return time.Unix(0, 0)
}
func (v *valueCollection) MaxTime() time.Time {
if v.floatValues != nil {
return v.floatValues[len(v.floatValues)-1].Time
} else if v.boolValues != nil {
return v.boolValues[len(v.boolValues)-1].Time
} else if v.intValues != nil {
return v.intValues[len(v.intValues)-1].Time
} else if v.stringValues != nil {
return v.stringValues[len(v.stringValues)-1].Time
}
return time.Unix(0, 0)
}
func (v *valueCollection) Encode(buf []byte) []byte {
if v.floatValues != nil {
return EncodeFloatBlock(buf, v.floatValues)
} else if v.boolValues != nil {
return EncodeBoolBlock(buf, v.boolValues)
} else if v.intValues != nil {
return EncodeInt64Block(buf, v.intValues)
} else if v.stringValues != nil {
return EncodeStringBlock(buf, v.stringValues)
}
return nil
}
// DecodeAndCombine take an encoded block from a file, decodes it and interleaves the file
// values with the values in this collection. nextTime and hasNext refer to if the file
// values with the values passed in. nextTime and hasNext refer to if the file
// has future encoded blocks so that this method can know how much of its values can be
// combined and output in the resulting encoded block.
func (v *valueCollection) DecodeAndCombine(block, buf []byte, nextTime int64, hasFutureBlock bool) ([]byte, error) {
if v.floatValues != nil {
values, err := DecodeFloatBlock(block)
if err != nil {
return nil, err
}
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) {
values := newValues.DecodeSameTypeBlock(block)
if hasFutureBlock {
// take all values that have times less than the future block and update the vals array
pos := sort.Search(len(v.floatValues), func(i int) bool {
return v.floatValues[i].Time.UnixNano() >= nextTime
})
values = append(values, v.floatValues[:pos]...)
v.floatValues = v.floatValues[pos:]
} else {
values = append(values, v.floatValues...)
v.floatValues = nil
}
sort.Sort(FloatValues(values))
// TODO: deduplicate values
var remainingValues Values
if len(values) > DefaultMaxPointsPerBlock {
v.floatValues = values[DefaultMaxPointsPerBlock:]
values = values[:DefaultMaxPointsPerBlock]
if hasFutureBlock {
// take all values that have times less than the future block and update the vals array
pos := sort.Search(len(newValues), func(i int) bool {
return newValues[i].Time().UnixNano() >= nextTime
})
values = append(values, newValues[:pos]...)
remainingValues = newValues[pos:]
sort.Sort(values)
} else {
requireSort := values.MaxTime() > newValues.MinTime()
values = append(values, newValues...)
if requireSort {
sort.Sort(values)
}
return EncodeFloatBlock(buf, values), nil
} else if v.boolValues != nil {
// TODO: wire up the other value types
return nil, fmt.Errorf("not implemented")
} else if v.intValues != nil {
return nil, fmt.Errorf("not implemented")
} else if v.stringValues != nil {
return nil, fmt.Errorf("not implemented")
}
return nil, nil
// TODO: deduplicate values
if len(values) > DefaultMaxPointsPerBlock {
remainingValues = values[DefaultMaxPointsPerBlock:]
values = values[:DefaultMaxPointsPerBlock]
}
return remainingValues, values.Encode(buf), nil
}
type dataFile struct {
@ -1040,7 +951,7 @@ type cursor struct {
f *dataFile
filesPos int // the index in the files slice we're looking at
pos uint32
vals FloatValues
vals Values
direction tsdb.Direction
@ -1121,7 +1032,7 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
// wasn't in the first value popped out of the block, check the rest
for i, v := range c.vals {
if v.Time.UnixNano() >= t {
if v.Time().UnixNano() >= t {
c.vals = c.vals[i+1:]
return v.TimeBytes(), v.ValueBytes()
}
@ -1220,180 +1131,3 @@ type uint64slice []uint64
func (a uint64slice) Len() int { return len(a) }
func (a uint64slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64slice) Less(i, j int) bool { return a[i] < a[j] }
/* TODO: REMOVE THIS STUFF */
func (e *Engine) pointsToBlocks(points [][]byte) []byte {
var b bytes.Buffer
block := make([]byte, 0)
for _, p := range points {
block = append(block, p[0:8]...)
block = append(block, u32tob(uint32(len(p)-8))...)
block = append(block, p[8:]...)
if len(block) > DefaultBlockSize {
e.writeBlockToBuffer(block, &b)
block = make([]byte, 0)
}
}
if len(block) > 0 {
e.writeBlockToBuffer(block, &b)
}
return b.Bytes()
}
func (e *Engine) writeBlockToBuffer(block []byte, b *bytes.Buffer) {
// write the min time
if _, err := b.Write(block[0:8]); err != nil {
panic(err)
}
// write the length of the compressed data
data := snappy.Encode(nil, block)
if _, err := b.Write(u32tob(uint32(len(data)))); err != nil {
panic(err)
}
// write the compressed data
if _, err := b.Write(data); err != nil {
panic(err)
}
}
func (e *Engine) readPointsFromFile(f *os.File) (map[uint64][][]byte, error) {
buf := make([]byte, 8)
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
seriesCount := btou64(buf)
positions := make([]uint64, seriesCount, seriesCount)
ids := make([]uint64, seriesCount, seriesCount)
// read the series index file header
position := uint64(8)
for i := 0; uint64(i) < seriesCount; i++ {
// read the id of the series
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
ids[i] = btou64(buf)
// read the min time and ignore
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
// read the starting position of this id
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
positions[i] = btou64(buf)
position += 32
}
if position != positions[0] {
panic("we aren't at the right place")
}
// read the raw data
seriesData := make(map[uint64][][]byte)
compressedBuff := make([]byte, DefaultBlockSize)
seriesPosition := 0
for {
// read the min time and ignore
if _, err := io.ReadFull(f, buf); err == io.EOF {
break
} else if err != nil {
return nil, err
}
// read the length of the compressed block
if _, err := io.ReadFull(f, buf[:4]); err != nil {
return nil, err
}
length := btou32(buf)
if length > uint32(len(compressedBuff)) {
compressedBuff = make([]byte, length)
}
if _, err := io.ReadFull(f, compressedBuff[:length]); err != nil {
return nil, err
}
data, err := snappy.Decode(nil, compressedBuff[:length])
if err != nil {
return nil, err
}
id := ids[seriesPosition]
seriesData[id] = append(seriesData[id], e.pointsFromDataBlock(data)...)
position += uint64(12 + length)
if seriesPosition+1 >= len(positions) {
continue
}
if positions[seriesPosition+1] == position {
seriesPosition += 1
}
}
return seriesData, nil
}
func (e *Engine) pointsFromDataBlock(data []byte) [][]byte {
a := make([][]byte, 0)
for {
length := entryDataSize(data)
p := append(data[:8], data[12:12+length]...)
a = append(a, p)
data = data[12+length:]
if len(data) == 0 {
break
}
}
return a
}
func entryDataSize(v []byte) int { return int(binary.BigEndian.Uint32(v[8:12])) }
func (e *Engine) lastFileAndNewFile() (*os.File, *os.File, error) {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", Format)))
if err != nil {
return nil, nil, err
}
if len(files) == 0 {
newFile, err := os.OpenFile(filepath.Join(e.path, fmt.Sprintf("%07d.%s", 1, Format)), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, nil, err
}
return nil, newFile, nil
}
oldFile, err := os.OpenFile(files[len(files)-1], os.O_RDONLY, 0666)
if err != nil {
return nil, nil, err
}
info, err := oldFile.Stat()
if err != nil {
_ = oldFile.Close()
return nil, nil, err
}
num := strings.Split(filepath.Base(files[len(files)-1]), ".")[0]
n, err := strconv.ParseUint(num, 10, 32)
if err != nil {
return nil, nil, err
}
newFile, err := os.OpenFile(filepath.Join(e.path, fmt.Sprintf("%07d.%s", n+1, Format)), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, nil, err
}
if info.Size() >= DefaultMaxFileSize {
oldFile.Close()
return nil, newFile, nil
}
return oldFile, newFile, nil
}

View File

@ -25,7 +25,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
p3 := parsePoint("cpu,host=A value=2.1 2000000000")
p4 := parsePoint("cpu,host=B value=2.2 2000000000")
if err := e.WriteAndCompact([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
if err := e.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -68,7 +68,7 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
}
verify(true)
if err := e.WriteAndCompact([]tsdb.Point{p4}, nil, nil); err != nil {
if err := e.WritePoints([]tsdb.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify(false)
@ -123,13 +123,13 @@ func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
}
st := time.Now()
if err := e.WriteAndCompact(points, nil, nil); err != nil {
if err := e.WritePoints(points, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
fmt.Println("took: ", time.Since(st))
st = time.Now()
if err := e.WriteAndCompact(points, nil, nil); err != nil {
if err := e.WritePoints(points, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
fmt.Println("took: ", time.Since(st))
@ -161,6 +161,7 @@ func OpenEngine(opt tsdb.EngineOptions) *Engine {
if err := e.Open(); err != nil {
panic(err)
}
e.WAL.SkipCache = true
return e
}

581
tsdb/engine/pd1/wal.go Normal file
View File

@ -0,0 +1,581 @@
package pd1
import (
"bytes"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// FileExtension is the file extension we expect for wal segments
WALFileExtension = "wal"
WALFilePrefix = "_"
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second
)
// flushType indiciates why a flush and compaction are being run so the partition can
// do the appropriate type of compaction
type flushType int
const (
// noFlush indicates that no flush or compaction are necesssary at this time
noFlush flushType = iota
// memoryFlush indicates that we should look for the series using the most
// memory to flush out and compact all others
memoryFlush
// idleFlush indicates that we should flush all series in the parition,
// delete all segment files and hold off on opening a new one
idleFlush
// deleteFlush indicates that we're flushing because series need to be removed from the WAL
deleteFlush
writeBufLen = 32 << 10 // 32kb
)
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
type walEntryType byte
const (
pointsEntry walEntryType = 0x01
fieldsEntry walEntryType = 0x02
seriesEntry walEntryType = 0x03
)
type Log struct {
path string
flushCheckTimer *time.Timer // check this often to see if a background flush should happen
flushCheckInterval time.Duration
// write variables
writeLock sync.Mutex
currentSegmentID int
currentSegmentFile *os.File
currentSegmentSize int
lastWriteTime time.Time
flushRunning bool
// cache variables
cacheLock sync.RWMutex
cache map[string]Values
cacheDirtySort map[string]bool // this map should be small, only for dirty vals
flushCache map[string]Values // temporary map while flushing
memorySize int
measurementFieldsCache map[string]*tsdb.MeasurementFields
seriesToCreateCache []*tsdb.SeriesCreate
// These coordinate closing and waiting for running goroutines.
wg sync.WaitGroup
closing chan struct{}
// LogOutput is the writer used by the logger.
LogOutput io.Writer
logger *log.Logger
// FlushColdInterval is the period of time after which a partition will do a
// full flush and compaction if it has been cold for writes.
FlushColdInterval time.Duration
// SegmentSize is the file size at which a segment file will be rotated
SegmentSize int
// MemorySizeThreshold specifies when the log should be forced to be flushed.
MemorySizeThreshold int
// Index is the database series will be flushed to
Index IndexWriter
// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool
// SkipCache specifies if the wal should immediately write to the index instead of
// caching data in memory. False by default so we buffer in memory before flushing to index.
SkipCache bool
// SkipDurability specifies if the wal should not write the wal entries to disk.
// False by default which means all writes are durable even when cached before flushing to index.
SkipDurability bool
}
// IndexWriter is an interface for the indexed database the WAL flushes data to
type IndexWriter interface {
WriteAndCompact(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func NewLog(path string) *Log {
return &Log{
path: path,
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MemorySizeThreshold: tsdb.DefaultPartitionSizeThreshold,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[pwl] ", log.LstdFlags),
}
}
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (l *Log) Open() error {
if l.LoggingEnabled {
l.logger.Printf("PD1 WAL starting with %d memory size threshold\n", l.MemorySizeThreshold)
l.logger.Printf("WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
}
l.cache = make(map[string]Values)
l.cacheDirtySort = make(map[string]bool)
l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields)
// TODO: read segment files and flush them all to disk
l.flushCheckTimer = time.NewTimer(l.flushCheckInterval)
// Start background goroutines.
l.wg.Add(1)
l.closing = make(chan struct{})
go l.autoflusher(l.closing)
return nil
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
// TODO: make this work for other fields
ck := seriesFieldKey(key, "value")
values := l.cache[ck]
// if we're in the middle of a flush, combine the previous cache
// with this one for the cursor
if l.flushCache != nil {
if fc, ok := l.flushCache[ck]; ok {
c := make([]Value, len(fc), len(fc)+len(values))
copy(c, fc)
c = append(c, values...)
return newWALCursor(c, direction)
}
}
if l.cacheDirtySort[ck] {
sort.Sort(values)
delete(l.cacheDirtySort, ck)
}
// build a copy so writes afterwards don't change the result set
a := make([]Value, len(values))
copy(a, values)
return newWALCursor(a, direction)
}
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
// make the write durable if specified
if !l.SkipDurability {
pointStrings := make([]string, len(points))
for i, p := range points {
pointStrings[i] = p.String()
}
data := strings.Join(pointStrings, "\n")
compressed := snappy.Encode(nil, []byte(data))
if err := l.writeToLog(pointsEntry, compressed); err != nil {
return err
}
// TODO: write the fields
// TODO: write the series
}
// convert to values that can be either cached in memory or flushed to the index
l.cacheLock.Lock()
for _, p := range points {
for name, value := range p.Fields() {
k := seriesFieldKey(string(p.Key()), name)
v := NewValue(p.Time(), value)
cacheValues := l.cache[k]
// only mark it as dirty if it isn't already
if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 {
dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() > v.Time().UnixNano()
if dirty {
l.cacheDirtySort[k] = true
}
}
l.memorySize += v.Size()
l.cache[k] = append(cacheValues, v)
}
}
for k, v := range fields {
l.measurementFieldsCache[k] = v
}
l.seriesToCreateCache = append(l.seriesToCreateCache, series...)
l.lastWriteTime = time.Now()
l.cacheLock.Unlock()
// usually skipping the cache is only for testing purposes and this was the easiest
// way to represent the logic (to cache and then immediately flush)
if l.SkipCache {
l.flush(idleFlush)
}
return nil
}
func (l *Log) writeToLog(writeType walEntryType, data []byte) error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.currentSegmentFile == nil {
l.newSegmentFile()
}
if _, err := l.currentSegmentFile.Write([]byte{byte(writeType)}); err != nil {
panic(fmt.Sprintf("error writing type to wal: %s", err.Error()))
}
if _, err := l.currentSegmentFile.Write(u32tob(uint32(len(data)))); err != nil {
panic(fmt.Sprintf("error writing len to wal: %s", err.Error()))
}
if _, err := l.currentSegmentFile.Write(data); err != nil {
panic(fmt.Sprintf("error writing data to wal: %s", err.Error()))
}
return l.currentSegmentFile.Sync()
}
// Flush will force a flush of the WAL to the index
func (l *Log) Flush() error {
return l.flush(idleFlush)
}
func (l *Log) DeleteSeries(keys []string) error {
panic("not implemented")
}
// Close will finish any flush that is currently in process and close file handles
func (l *Log) Close() error {
// stop the autoflushing process so it doesn't try to kick another one off
l.writeLock.Lock()
l.cacheLock.Lock()
if l.closing != nil {
close(l.closing)
l.closing = nil
}
l.writeLock.Unlock()
l.cacheLock.Unlock()
// Allow goroutines to finish running.
l.wg.Wait()
// Lock the remainder of the closing process.
l.writeLock.Lock()
l.cacheLock.Lock()
defer l.writeLock.Unlock()
defer l.cacheLock.Unlock()
l.cache = nil
l.measurementFieldsCache = nil
l.seriesToCreateCache = nil
if l.currentSegmentFile == nil {
return nil
}
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
l.currentSegmentFile = nil
return nil
}
// close all the open Log partitions and file handles
func (l *Log) close() error {
l.cache = nil
l.cacheDirtySort = nil
if l.currentSegmentFile == nil {
return nil
}
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
l.currentSegmentFile = nil
return nil
}
// flush writes all wal data in memory to the index
func (l *Log) flush(flush flushType) error {
l.writeLock.Lock()
if l.flushRunning {
l.writeLock.Unlock()
return nil
}
l.flushRunning = true
defer func() {
l.writeLock.Lock()
l.flushRunning = false
l.writeLock.Unlock()
}()
lastFileID := l.currentSegmentID
if err := l.newSegmentFile(); err != nil {
// there's no recovering from this, fail hard
panic(fmt.Sprintf("error creating new wal file: %s", err.Error()))
}
l.writeLock.Unlock()
// copy the cache items to new maps so we can empty them out
l.cacheLock.Lock()
// move over the flush cache and make a copy to write
l.flushCache = l.cache
l.cache = make(map[string]Values)
l.cacheDirtySort = make(map[string]bool)
valuesByKey := make(map[string]Values)
valueCount := 0
for key, v := range l.flushCache {
valuesByKey[key] = v
valueCount += len(v)
}
if l.LoggingEnabled {
ftype := "idle"
if flush == memoryFlush {
ftype = "memory"
}
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(valuesByKey), valueCount, l.memorySize)
}
// reset the memory being used by the cache
l.memorySize = 0
// reset the measurements for flushing
mfc := l.measurementFieldsCache
l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields)
// reset the series for flushing
scc := l.seriesToCreateCache
l.seriesToCreateCache = nil
l.cacheLock.Unlock()
startTime := time.Now()
if err := l.Index.WriteAndCompact(valuesByKey, mfc, scc); err != nil {
return err
}
if l.LoggingEnabled {
l.logger.Printf("flush to index took %s\n", time.Since(startTime))
}
l.cacheLock.Lock()
l.flushCache = nil
l.cacheLock.Unlock()
// remove all the old segment files
fileNames, err := l.segmentFileNames()
if err != nil {
return err
}
for _, fn := range fileNames {
id, err := idFromFileName(fn)
if err != nil {
return err
}
if id <= lastFileID {
err := os.Remove(fn)
if err != nil {
return err
}
}
}
return nil
}
// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction
func (l *Log) triggerAutoFlush() {
if f := l.shouldFlush(); f != noFlush {
if err := l.flush(f); err != nil {
l.logger.Printf("error flushing wal: %s\n", err)
}
}
}
// autoflusher waits for notification of a flush and kicks it off in the background.
// This method runs in a separate goroutine.
func (l *Log) autoflusher(closing chan struct{}) {
defer l.wg.Done()
for {
// Wait for close or flush signal.
select {
case <-closing:
return
case <-l.flushCheckTimer.C:
l.triggerAutoFlush()
l.flushCheckTimer.Reset(l.flushCheckInterval)
}
}
}
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID
func (l *Log) segmentFileNames() ([]string, error) {
names, err := filepath.Glob(filepath.Join(l.path, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
if err != nil {
return nil, err
}
sort.Strings(names)
return names, nil
}
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log
func (l *Log) newSegmentFile() error {
l.currentSegmentID += 1
if l.currentSegmentFile != nil {
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
}
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
l.currentSegmentSize = 0
l.currentSegmentFile = ff
return nil
}
// shouldFlush
func (l *Log) shouldFlush() flushType {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
if len(l.cache) == 0 {
return noFlush
}
if l.memorySize > l.MemorySizeThreshold {
return memoryFlush
}
if time.Since(l.lastWriteTime) > l.FlushColdInterval {
return idleFlush
}
return noFlush
}
// cursor is a unidirectional iterator for a given entry in the cache
type walCursor struct {
cache Values
position int
direction tsdb.Direction
}
func newWALCursor(cache Values, direction tsdb.Direction) *walCursor {
// position is set such that a call to Next will successfully advance
// to the next postion and return the value.
c := &walCursor{cache: cache, direction: direction, position: -1}
if direction.Reverse() {
c.position = len(c.cache)
}
return c
}
func (c *walCursor) Direction() tsdb.Direction { return c.direction }
// Seek will point the cursor to the given time (or key)
func (c *walCursor) Seek(seek []byte) (key, value []byte) {
// Seek cache index
c.position = sort.Search(len(c.cache), func(i int) bool {
return bytes.Compare(c.cache[i].TimeBytes(), seek) != -1
})
// If seek is not in the cache, return the last value in the cache
if c.direction.Reverse() && c.position >= len(c.cache) {
c.position = len(c.cache)
}
// Make sure our position points to something in the cache
if c.position < 0 || c.position >= len(c.cache) {
return nil, nil
}
v := c.cache[c.position]
return v.TimeBytes(), v.ValueBytes()
}
// Next moves the cursor to the next key/value. will return nil if at the end
func (c *walCursor) Next() (key, value []byte) {
var v Value
if c.direction.Forward() {
v = c.nextForward()
} else {
v = c.nextReverse()
}
return v.TimeBytes(), v.ValueBytes()
}
// nextForward advances the cursor forward returning the next value
func (c *walCursor) nextForward() Value {
c.position++
if c.position >= len(c.cache) {
return &EmptyValue{}
}
return c.cache[c.position]
}
// nextReverse advances the cursor backwards returning the next value
func (c *walCursor) nextReverse() Value {
c.position--
if c.position < 0 {
return &EmptyValue{}
}
return c.cache[c.position]
}
// idFromFileName parses the segment file ID from its name
func idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")
if len(parts) != 2 {
return 0, fmt.Errorf("file %s has wrong name format to have an id", name)
}
id, err := strconv.ParseUint(parts[0][1:], 10, 32)
return int(id), err
}