Merge pull request #4308 from influxdb/pd-storage-engine

The TSM storage engine
pull/4348/head
Paul Dix 2015-10-06 15:54:56 -07:00
commit f041939a1c
34 changed files with 7928 additions and 53 deletions

View File

@ -37,7 +37,9 @@ reporting-disabled = false
[data]
dir = "/var/opt/influxdb/data"
# Controls the engine type for new shards.
# Controls the engine type for new shards. Options are b1, bz1, or tsm1.
# b1 is the 0.9.2 storage engine, bz1 is the 0.9.3 and 0.9.4 engine.
# tsm1 is the 0.9.5 engine
# engine ="bz1"
# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't

View File

@ -1021,6 +1021,10 @@ func (p *point) Tags() Tags {
i, key = scanTo(p.key, i, '=')
i, value = scanTagValue(p.key, i+1)
if len(value) == 0 {
continue
}
tags[string(unescapeTag(key))] = string(unescapeTag(value))
i += 1
@ -1141,7 +1145,10 @@ func (t Tags) HashKey() []byte {
for k, v := range t {
ek := escapeTag([]byte(k))
ev := escapeTag([]byte(v))
escaped[string(ek)] = string(ev)
if len(ev) > 0 {
escaped[string(ek)] = string(ev)
}
}
// Extract keys and determine final size.

View File

@ -605,6 +605,18 @@ func TestParsePointUnescape(t *testing.T) {
},
time.Unix(0, 0)))
// tag with no value
test(t, `cpu,regions=east value="1"`,
models.NewPoint("cpu",
models.Tags{
"regions": "east",
"foobar": "",
},
models.Fields{
"value": "1",
},
time.Unix(0, 0)))
// commas in field values
test(t, `cpu,regions=east value="1,0"`,
models.NewPoint("cpu",

View File

@ -19,6 +19,7 @@ import (
// Ensure the service can return shard data.
func TestService_handleConn(t *testing.T) {
t.Skip("not implemented for tsm1 engine")
s := MustOpenService()
defer s.Close()

View File

@ -42,7 +42,15 @@ const (
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB
// Default WAL settings for the TSM1 WAL
DefaultFlushMemorySizeThreshold = 5 * 1024 * 1024 // 5MB
DefaultMaxMemorySizeThreshold = 100 * 1024 * 1024 // 100MB
DefaultIndexCompactionAge = time.Minute
DefaultIndexMinCompactionInterval = time.Minute
DefaultIndexMinCompactionFileCount = 5
DefaultIndexCompactionFullAge = 5 * time.Minute
)
type Config struct {
@ -63,6 +71,28 @@ type Config struct {
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
// WAL configuration options for tsm1 introduced in 0.9.5
WALFlushMemorySizeThreshold int `toml:"wal-flush-memory-size-threshold"`
WALMaxMemorySizeThreshold int `toml:"wal-max-memory-size-threshold"`
// compaction options for tsm1 introduced in 0.9.5
// IndexCompactionAge specifies the duration after the data file creation time
// at which it is eligible to be compacted
IndexCompactionAge time.Duration `toml:"index-compaction-age"`
// IndexMinimumCompactionInterval specifies the minimum amount of time that must
// pass after a compaction before another compaction is run
IndexMinCompactionInterval time.Duration `toml:"index-min-compaction-interval"`
// IndexCompactionFileCount specifies the minimum number of data files that
// must be eligible for compaction before actually running one
IndexMinCompactionFileCount int `toml:"index-compaction-min-file-count"`
// IndexCompactionFullAge specifies how long after the last write was received
// in the WAL that a full compaction should be performed.
IndexCompactionFullAge time.Duration `toml:"index-compaction-full-age"`
// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`
}
@ -74,12 +104,18 @@ func NewConfig() Config {
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),
WALLoggingEnabled: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
WALLoggingEnabled: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
WALFlushMemorySizeThreshold: DefaultFlushMemorySizeThreshold,
WALMaxMemorySizeThreshold: DefaultMaxMemorySizeThreshold,
IndexCompactionAge: DefaultIndexCompactionAge,
IndexMinCompactionFileCount: DefaultIndexMinCompactionFileCount,
IndexCompactionFullAge: DefaultIndexCompactionFullAge,
IndexMinCompactionInterval: DefaultIndexMinCompactionInterval,
QueryLogEnabled: true,
}

View File

@ -24,7 +24,7 @@ type Engine interface {
Close() error
SetLogOutput(io.Writer)
LoadMetadataIndex(index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error
LoadMetadataIndex(shard *Shard, index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error
Begin(writable bool) (Tx, error)
WritePoints(points []models.Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error
@ -32,9 +32,23 @@ type Engine interface {
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
// PerformMaintenance will get called periodically by the store
PerformMaintenance()
// Format will return the format for the engine
Format() EngineFormat
io.WriterTo
}
type EngineFormat int
const (
B1Format EngineFormat = iota
BZ1Format
TSM1Format
)
// NewEngineFunc creates a new engine.
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
@ -57,9 +71,24 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
return newEngineFuncs[options.EngineVersion](path, walPath, options), nil
}
// Only bolt-based backends are currently supported so open it and check the format.
// Only bolt and tsm1 based storage engines are currently supported
var format string
if err := func() error {
// if it's a dir then it's a tsm1 engine
f, err := os.Open(path)
if err != nil {
return err
}
fi, err := f.Stat()
f.Close()
if err != nil {
return err
}
if fi.Mode().IsDir() {
format = "tsm1"
return nil
}
db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err

View File

@ -91,6 +91,14 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
// Path returns the path the engine was initialized with.
func (e *Engine) Path() string { return e.path }
// PerformMaintenance is for periodic maintenance of the store. A no-op for b1
func (e *Engine) PerformMaintenance() {}
// Format returns the format type of this engine
func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.B1Format
}
// Open opens and initializes the engine.
func (e *Engine) Open() error {
if err := func() error {
@ -174,7 +182,7 @@ func (e *Engine) close() error {
func (e *Engine) SetLogOutput(w io.Writer) { e.LogOutput = w }
// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return e.db.View(func(tx *bolt.Tx) error {
// load measurement metadata
meta := tx.Bucket([]byte("fields"))

View File

@ -21,7 +21,7 @@ func TestEngine_WritePoints(t *testing.T) {
// Create metadata.
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
mf.CreateFieldIfNotExists("value", influxql.Float, true)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(models.MakeKey([]byte("temperature"), nil)), nil)},
}
@ -84,7 +84,7 @@ func TestEngine_WritePoints_Reverse(t *testing.T) {
// Create metadata.
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
mf.CreateFieldIfNotExists("value", influxql.Float, true)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(models.MakeKey([]byte("temperature"), nil)), nil)},
}

View File

@ -114,6 +114,14 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
// Path returns the path the engine was opened with.
func (e *Engine) Path() string { return e.path }
// PerformMaintenance is for periodic maintenance of the store. A no-op for bz1
func (e *Engine) PerformMaintenance() {}
// Format returns the format type of this engine
func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.BZ1Format
}
// Open opens and initializes the engine.
func (e *Engine) Open() error {
if err := func() error {
@ -176,7 +184,7 @@ func (e *Engine) close() error {
func (e *Engine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
if err := e.db.View(func(tx *bolt.Tx) error {
// Load measurement metadata
fields, err := e.readFields(tx)

View File

@ -38,7 +38,7 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
// Load metadata index.
index := tsdb.NewDatabaseIndex()
if err := e.LoadMetadataIndex(index, make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
@ -80,7 +80,7 @@ func TestEngine_LoadMetadataIndex_Fields(t *testing.T) {
// Load metadata index.
mfs := make(map[string]*tsdb.MeasurementFields)
if err := e.LoadMetadataIndex(tsdb.NewDatabaseIndex(), mfs); err != nil {
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex(), mfs); err != nil {
t.Fatal(err)
}

View File

@ -3,4 +3,5 @@ package engine
import (
_ "github.com/influxdb/influxdb/tsdb/engine/b1"
_ "github.com/influxdb/influxdb/tsdb/engine/bz1"
_ "github.com/influxdb/influxdb/tsdb/engine/tsm1"
)

135
tsdb/engine/tsm1/bool.go Normal file
View File

@ -0,0 +1,135 @@
package tsm1
// bool encoding uses 1 bit per value. Each compressed byte slice contains a 1 byte header
// indicating the compression type, followed by a variable byte encoded length indicating
// how many booleans are packed in the slice. The remaining bytes contains 1 byte for every
// 8 boolean values encoded.
import "encoding/binary"
const (
// boolUncompressed is an uncompressed boolean format
boolUncompressed = 0
// boolCompressedBitPacked is an bit packed format using 1 bit per boolean
boolCompressedBitPacked = 1
)
type BoolEncoder interface {
Write(b bool)
Bytes() ([]byte, error)
}
type BoolDecoder interface {
Next() bool
Read() bool
Error() error
}
type boolEncoder struct {
// The encoded bytes
bytes []byte
// The current byte being encoded
b byte
// The number of bools packed into b
i int
// The total number of bools written
n int
}
func NewBoolEncoder() BoolEncoder {
return &boolEncoder{}
}
func (e *boolEncoder) Write(b bool) {
// If we have filled the current byte, flush it
if e.i >= 8 {
e.flush()
}
// Use 1 bit for each boolen value, shift the current byte
// by 1 and set the least signficant bit acordingly
e.b = e.b << 1
if b {
e.b |= 1
}
// Increment the current bool count
e.i += 1
// Increment the total bool count
e.n += 1
}
func (e *boolEncoder) flush() {
// Pad remaining byte w/ 0s
for e.i < 8 {
e.b = e.b << 1
e.i += 1
}
// If we have bits set, append them to the byte slice
if e.i > 0 {
e.bytes = append(e.bytes, e.b)
e.b = 0
e.i = 0
}
}
func (e *boolEncoder) Bytes() ([]byte, error) {
// Ensure the current byte is flushed
e.flush()
b := make([]byte, 10+1)
// Store the encoding type in the 4 high bits of the first byte
b[0] = byte(boolCompressedBitPacked) << 4
i := 1
// Encode the number of bools written
i += binary.PutUvarint(b[i:], uint64(e.n))
// Append the packed booleans
return append(b[:i], e.bytes...), nil
}
type boolDecoder struct {
b []byte
i int
n int
err error
}
func NewBoolDecoder(b []byte) BoolDecoder {
// First byte stores the encoding type, only have 1 bit-packet format
// currently ignore for now.
b = b[1:]
count, n := binary.Uvarint(b)
return &boolDecoder{b: b[n:], i: -1, n: int(count)}
}
func (e *boolDecoder) Next() bool {
e.i += 1
return e.i < e.n
}
func (e *boolDecoder) Read() bool {
// Index into the byte slice
idx := e.i / 8
// Bit position
pos := (8 - e.i%8) - 1
// The mask to select the bit
mask := byte(1 << uint(pos))
// The packed byte
v := e.b[idx]
// Returns true if the bit is set
return v&mask == mask
}
func (e *boolDecoder) Error() error {
return e.err
}

View File

@ -0,0 +1,73 @@
package tsm1_test
import (
"testing"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func Test_BoolEncoder_NoValues(t *testing.T) {
enc := tsm1.NewBoolEncoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewBoolDecoder(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_BoolEncoder_Single(t *testing.T) {
enc := tsm1.NewBoolEncoder()
v1 := true
enc.Write(v1)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewBoolDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v1 != dec.Read() {
t.Fatalf("unexpected value: got %v, exp %v", dec.Read(), v1)
}
}
func Test_BoolEncoder_Multi_Compressed(t *testing.T) {
enc := tsm1.NewBoolEncoder()
values := make([]bool, 10)
for i := range values {
values[i] = i%2 == 0
enc.Write(values[i])
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if exp := 4; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
dec := tsm1.NewBoolDecoder(b)
for i, v := range values {
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v != dec.Read() {
t.Fatalf("unexpected value at pos %d: got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}

481
tsdb/engine/tsm1/cursor.go Normal file
View File

@ -0,0 +1,481 @@
package tsm1
import (
"math"
"github.com/influxdb/influxdb/tsdb"
)
// combinedEngineCursor holds a cursor for the WAL and the index
// and will combine the two together. Any points in the WAL with
// identical timestamps from the index will be preferred over the
// index point
type combinedEngineCursor struct {
walCursor tsdb.Cursor
engineCursor tsdb.Cursor
walKeyBuf int64
walValueBuf interface{}
engineKeyBuf int64
engineValueBuf interface{}
ascending bool
}
func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor {
return &combinedEngineCursor{
walCursor: wc,
engineCursor: ec,
ascending: ascending,
}
}
// SeekTo will seek both the index and WAL cursor
func (c *combinedEngineCursor) SeekTo(seek int64) (key int64, value interface{}) {
c.walKeyBuf, c.walValueBuf = c.walCursor.SeekTo(seek)
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.SeekTo(seek)
return c.read()
}
// Next returns the next value in the cursor
func (c *combinedEngineCursor) Next() (int64, interface{}) {
return c.read()
}
// Ascending returns true if the cursor is time ascending
func (c *combinedEngineCursor) Ascending() bool {
return c.ascending
}
// read will return the buffer value that is next from either the
// WAL or index cursor and repopulate the buffer value with the
// appropriate cursor's next value
func (c *combinedEngineCursor) read() (key int64, value interface{}) {
if c.walKeyBuf == tsdb.EOF && c.engineKeyBuf == tsdb.EOF {
return tsdb.EOF, nil
}
// handle the case where they have the same point
if c.walKeyBuf == c.engineKeyBuf {
// keep the wal value since it will overwrite the engine value
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
// overwrite the buffered engine values
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.Next()
return
}
// ascending order
if c.ascending {
if c.walKeyBuf != tsdb.EOF && (c.walKeyBuf < c.engineKeyBuf || c.engineKeyBuf == tsdb.EOF) {
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
return
}
key = c.engineKeyBuf
value = c.engineValueBuf
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.Next()
return
}
// descending order
if c.walKeyBuf != tsdb.EOF && c.walKeyBuf > c.engineKeyBuf {
key = c.walKeyBuf
value = c.walValueBuf
c.walKeyBuf, c.walValueBuf = c.walCursor.Next()
return
}
key = c.engineKeyBuf
value = c.engineValueBuf
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.Next()
return
}
// multieFieldCursor wraps cursors for multiple fields on the same series
// key. Instead of returning a plain interface value in the call for Next(),
// it returns a map[string]interface{} for the field values
type multiFieldCursor struct {
fields []string
cursors []tsdb.Cursor
ascending bool
keyBuffer []int64
valueBuffer []interface{}
}
func NewMultiFieldCursor(fields []string, cursors []tsdb.Cursor, ascending bool) tsdb.Cursor {
return &multiFieldCursor{
fields: fields,
cursors: cursors,
ascending: ascending,
keyBuffer: make([]int64, len(cursors)),
valueBuffer: make([]interface{}, len(cursors)),
}
}
func (m *multiFieldCursor) SeekTo(seek int64) (key int64, value interface{}) {
for i, c := range m.cursors {
m.keyBuffer[i], m.valueBuffer[i] = c.SeekTo(seek)
}
return m.read()
}
func (m *multiFieldCursor) Next() (int64, interface{}) {
return m.read()
}
func (m *multiFieldCursor) Ascending() bool {
return m.ascending
}
func (m *multiFieldCursor) read() (int64, interface{}) {
t := int64(math.MaxInt64)
if !m.ascending {
t = int64(math.MinInt64)
}
// find the time we need to combine all fields
for _, k := range m.keyBuffer {
if k == tsdb.EOF {
continue
}
if m.ascending && t > k {
t = k
} else if !m.ascending && t < k {
t = k
}
}
// get the value and advance each of the cursors that have the matching time
if t == math.MinInt64 || t == math.MaxInt64 {
return tsdb.EOF, nil
}
mm := make(map[string]interface{})
for i, k := range m.keyBuffer {
if k == t {
mm[m.fields[i]] = m.valueBuffer[i]
m.keyBuffer[i], m.valueBuffer[i] = m.cursors[i].Next()
}
}
return t, mm
}
type emptyCursor struct {
ascending bool
}
func (c *emptyCursor) Next() (int64, interface{}) { return tsdb.EOF, nil }
func (c *emptyCursor) SeekTo(key int64) (int64, interface{}) { return tsdb.EOF, nil }
func (c *emptyCursor) Ascending() bool { return c.ascending }
// cursor is a cursor for the data in the index
type cursor struct {
// id for the series key and field
id uint64
// f is the current data file we're reading from
f *dataFile
// filesPos is the position in the files index we're reading from
filesPos int // the index in the files slice we're looking at
// pos is the position in the current data file we're reading
pos uint32
// vals is the current decoded block of Values we're iterating from
vals Values
ascending bool
// blockPositions is used for descending queries to keep track
// of what positions in the current data file encoded blocks for
// the id exist at
blockPositions []uint32
// time acending slice of read only data files
files []*dataFile
}
func newCursor(id uint64, files []*dataFile, ascending bool) *cursor {
return &cursor{
id: id,
ascending: ascending,
files: files,
}
}
func (c *cursor) SeekTo(seek int64) (int64, interface{}) {
if len(c.files) == 0 {
return tsdb.EOF, nil
}
if c.ascending {
if seek <= c.files[0].MinTime() {
c.filesPos = 0
c.f = c.files[0]
} else {
for i, f := range c.files {
if seek >= f.MinTime() && seek <= f.MaxTime() {
c.filesPos = i
c.f = f
break
}
}
}
} else {
if seek >= c.files[len(c.files)-1].MaxTime() {
c.filesPos = len(c.files) - 1
c.f = c.files[c.filesPos]
} else if seek < c.files[0].MinTime() {
return tsdb.EOF, nil
} else {
for i, f := range c.files {
if seek >= f.MinTime() && seek <= f.MaxTime() {
c.filesPos = i
c.f = f
break
}
}
}
}
if c.f == nil {
return tsdb.EOF, nil
}
// find the first file we need to check in
for {
if c.filesPos < 0 || c.filesPos >= len(c.files) {
return tsdb.EOF, nil
}
c.f = c.files[c.filesPos]
c.pos = c.f.StartingPositionForID(c.id)
// if this id isn't in this file, move to next one or return
if c.pos == 0 {
if c.ascending {
c.filesPos++
} else {
c.filesPos--
c.blockPositions = nil
}
continue
}
// handle seek for correct order
k := tsdb.EOF
var v interface{}
if c.ascending {
k, v = c.seekAscending(seek)
} else {
k, v = c.seekDescending(seek)
}
if k != tsdb.EOF {
return k, v
}
if c.ascending {
c.filesPos++
} else {
c.filesPos--
c.blockPositions = nil
}
}
}
func (c *cursor) seekAscending(seek int64) (int64, interface{}) {
// seek to the block and values we're looking for
for {
// if the time is between this block and the next,
// decode this block and go, otherwise seek to next block
length := c.blockLength(c.pos)
// if the next block has a time less than what we're seeking to,
// skip decoding this block and continue on
nextBlockPos := c.pos + blockHeaderSize + length
if nextBlockPos < c.f.indexPosition() {
nextBlockID := btou64(c.f.mmap[nextBlockPos : nextBlockPos+8])
if nextBlockID == c.id {
nextBlockTime := c.blockMinTime(nextBlockPos)
if nextBlockTime <= seek {
c.pos = nextBlockPos
continue
}
}
}
// it must be in this block or not at all
id := btou64((c.f.mmap[c.pos : c.pos+8]))
if id != c.id {
return tsdb.EOF, nil
}
c.decodeBlock(c.pos)
// see if we can find it in this block
for i, v := range c.vals {
if v.Time().UnixNano() >= seek {
c.vals = c.vals[i+1:]
return v.Time().UnixNano(), v.Value()
}
}
}
}
func (c *cursor) seekDescending(seek int64) (int64, interface{}) {
c.setBlockPositions()
if len(c.blockPositions) == 0 {
return tsdb.EOF, nil
}
for i := len(c.blockPositions) - 1; i >= 0; i-- {
pos := c.blockPositions[i]
if c.blockMinTime(pos) > seek {
continue
}
c.decodeBlock(pos)
c.blockPositions = c.blockPositions[:i]
for i := len(c.vals) - 1; i >= 0; i-- {
val := c.vals[i]
if seek >= val.UnixNano() {
c.vals = c.vals[:i]
return val.UnixNano(), val.Value()
}
if seek < val.UnixNano() {
// we need to move to the next block
if i == 0 {
break
}
val := c.vals[i-1]
c.vals = c.vals[:i-1]
return val.UnixNano(), val.Value()
}
}
c.blockPositions = c.blockPositions[:i]
}
return tsdb.EOF, nil
}
// blockMinTime is the minimum time for the block
func (c *cursor) blockMinTime(pos uint32) int64 {
return int64(btou64(c.f.mmap[pos+12 : pos+20]))
}
// setBlockPositions will read the positions of all
// blocks for the cursor id in the given data file
func (c *cursor) setBlockPositions() {
pos := c.pos
for {
if pos >= c.f.indexPosition() {
return
}
length := c.blockLength(pos)
id := btou64(c.f.mmap[pos : pos+8])
if id != c.id {
return
}
c.blockPositions = append(c.blockPositions, pos)
pos += blockHeaderSize + length
}
}
func (c *cursor) Next() (int64, interface{}) {
if c.ascending {
k, v := c.nextAscending()
return k, v
}
return c.nextDescending()
}
func (c *cursor) nextAscending() (int64, interface{}) {
if len(c.vals) > 0 {
v := c.vals[0]
c.vals = c.vals[1:]
return v.Time().UnixNano(), v.Value()
}
// if we have a file set, see if the next block is for this ID
if c.f != nil && c.pos < c.f.indexPosition() {
nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8])
if nextBlockID == c.id {
c.decodeBlock(c.pos)
return c.nextAscending()
}
}
// loop through the files until we hit the next one that has this id
for {
c.filesPos++
if c.filesPos >= len(c.files) {
return tsdb.EOF, nil
}
c.f = c.files[c.filesPos]
startingPos := c.f.StartingPositionForID(c.id)
if startingPos == 0 {
// move to next file because it isn't in this one
continue
}
// we have a block with this id, decode and return
c.decodeBlock(startingPos)
return c.nextAscending()
}
}
func (c *cursor) nextDescending() (int64, interface{}) {
if len(c.vals) > 0 {
v := c.vals[len(c.vals)-1]
if len(c.vals) >= 1 {
c.vals = c.vals[:len(c.vals)-1]
} else {
c.vals = nil
}
return v.UnixNano(), v.Value()
}
for i := len(c.blockPositions) - 1; i >= 0; i-- {
c.decodeBlock(c.blockPositions[i])
c.blockPositions = c.blockPositions[:i]
if len(c.vals) == 0 {
continue
}
val := c.vals[len(c.vals)-1]
c.vals = c.vals[:len(c.vals)-1]
return val.UnixNano(), val.Value()
}
return tsdb.EOF, nil
}
func (c *cursor) blockLength(pos uint32) uint32 {
return btou32(c.f.mmap[pos+8 : pos+12])
}
// decodeBlock will decod the block and set the vals
func (c *cursor) decodeBlock(position uint32) {
length := c.blockLength(position)
block := c.f.mmap[position+blockHeaderSize : position+blockHeaderSize+length]
c.vals, _ = DecodeBlock(block)
// only adavance the position if we're asceending.
// Descending queries use the blockPositions
if c.ascending {
c.pos = position + blockHeaderSize + length
}
}
func (c *cursor) Ascending() bool { return c.ascending }

View File

@ -0,0 +1,554 @@
package tsm1
import (
"encoding/binary"
"fmt"
"sort"
"time"
"github.com/influxdb/influxdb/tsdb"
)
const (
// BlockFloat64 designates a block encodes float64 values
BlockFloat64 = 0
// BlockInt64 designates a block encodes int64 values
BlockInt64 = 1
// BlockBool designates a block encodes bool values
BlockBool = 2
// BlockString designates a block encodes string values
BlockString = 3
// encodedBlockHeaderSize is the size of the header for an encoded block. The first 8 bytes
// are the minimum timestamp of the block. The next byte is a block encoding type indicator.
encodedBlockHeaderSize = 9
)
type Value interface {
Time() time.Time
UnixNano() int64
Value() interface{}
Size() int
}
func NewValue(t time.Time, value interface{}) Value {
switch v := value.(type) {
case int64:
return &Int64Value{time: t, value: v}
case float64:
return &FloatValue{time: t, value: v}
case bool:
return &BoolValue{time: t, value: v}
case string:
return &StringValue{time: t, value: v}
}
return &EmptyValue{}
}
type EmptyValue struct {
}
func (e *EmptyValue) UnixNano() int64 { return tsdb.EOF }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) }
func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 }
// Values represented a time ascending sorted collection of Value types.
// the underlying type should be the same across all values, but the interface
// makes the code cleaner.
type Values []Value
func (v Values) MinTime() int64 {
return v[0].Time().UnixNano()
}
func (v Values) MaxTime() int64 {
return v[len(v)-1].Time().UnixNano()
}
func (v Values) Encode(buf []byte) ([]byte, error) {
switch v[0].(type) {
case *FloatValue:
return encodeFloatBlock(buf, v)
case *Int64Value:
return encodeInt64Block(buf, v)
case *BoolValue:
return encodeBoolBlock(buf, v)
case *StringValue:
return encodeStringBlock(buf, v)
}
return nil, fmt.Errorf("unsupported value type %T", v[0])
}
func (v Values) DecodeSameTypeBlock(block []byte) Values {
switch v[0].(type) {
case *FloatValue:
a, _ := decodeFloatBlock(block)
return a
case *Int64Value:
a, _ := decodeInt64Block(block)
return a
case *BoolValue:
a, _ := decodeBoolBlock(block)
return a
case *StringValue:
a, _ := decodeStringBlock(block)
return a
}
return nil
}
// DecodeBlock takes a byte array and will decode into values of the appropriate type
// based on the block
func DecodeBlock(block []byte) (Values, error) {
if len(block) <= encodedBlockHeaderSize {
panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
}
blockType := block[8]
switch blockType {
case BlockFloat64:
return decodeFloatBlock(block)
case BlockInt64:
return decodeInt64Block(block)
case BlockBool:
return decodeBoolBlock(block)
case BlockString:
return decodeStringBlock(block)
default:
panic(fmt.Sprintf("unknown block type: %d", blockType))
}
}
// Deduplicate returns a new Values slice with any values
// that have the same timestamp removed. The Value that appears
// last in the slice is the one that is kept. The returned slice is in ascending order
func (v Values) Deduplicate() Values {
m := make(map[int64]Value)
for _, val := range v {
m[val.UnixNano()] = val
}
a := make([]Value, 0, len(m))
for _, val := range m {
a = append(a, val)
}
sort.Sort(Values(a))
return a
}
// Sort methods
func (a Values) Len() int { return len(a) }
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Values) Less(i, j int) bool { return a[i].Time().UnixNano() < a[j].Time().UnixNano() }
type FloatValue struct {
time time.Time
value float64
}
func (f *FloatValue) Time() time.Time {
return f.time
}
func (f *FloatValue) UnixNano() int64 {
return f.time.UnixNano()
}
func (f *FloatValue) Value() interface{} {
return f.value
}
func (f *FloatValue) Size() int {
return 16
}
func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A float block is encoded using different compression strategies
// for timestamps and values.
// Encode values using Gorilla float compression
venc := NewFloatEncoder()
// Encode timestamps using an adaptive encoder that uses delta-encoding,
// frame-or-reference and run length encoding.
tsenc := NewTimeEncoder()
for _, v := range values {
tsenc.Write(v.Time())
venc.Push(v.(*FloatValue).value)
}
venc.Finish()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return nil, err
}
// Encoded float values
vb := venc.Bytes()
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
block := packBlockHeader(values[0].Time(), BlockFloat64)
block = append(block, packBlock(tb, vb)...)
return block, nil
}
func decodeFloatBlock(block []byte) ([]Value, error) {
// The first 8 bytes is the minimum timestamp of the block
block = block[8:]
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
if blockType != BlockFloat64 {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
}
block = block[1:]
tb, vb := unpackBlock(block)
// Setup our timestamp and value decoders
dec := NewTimeDecoder(tb)
iter, err := NewFloatDecoder(vb)
if err != nil {
return nil, err
}
// Decode both a timestamp and value
var a []Value
for dec.Next() && iter.Next() {
ts := dec.Read()
v := iter.Values()
a = append(a, &FloatValue{ts, v})
}
// Did timestamp decoding have an error?
if dec.Error() != nil {
return nil, dec.Error()
}
// Did float decoding have an error?
if iter.Error() != nil {
return nil, iter.Error()
}
return a, nil
}
type BoolValue struct {
time time.Time
value bool
}
func (b *BoolValue) Time() time.Time {
return b.time
}
func (b *BoolValue) Size() int {
return 9
}
func (b *BoolValue) UnixNano() int64 {
return b.time.UnixNano()
}
func (b *BoolValue) Value() interface{} {
return b.value
}
func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A bool block is encoded using different compression strategies
// for timestamps and values.
// Encode values using Gorilla float compression
venc := NewBoolEncoder()
// Encode timestamps using an adaptive encoder
tsenc := NewTimeEncoder()
for _, v := range values {
tsenc.Write(v.Time())
venc.Write(v.(*BoolValue).value)
}
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return nil, err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
block := packBlockHeader(values[0].Time(), BlockBool)
block = append(block, packBlock(tb, vb)...)
return block, nil
}
func decodeBoolBlock(block []byte) ([]Value, error) {
// The first 8 bytes is the minimum timestamp of the block
block = block[8:]
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
if blockType != BlockBool {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockBool, blockType)
}
block = block[1:]
tb, vb := unpackBlock(block)
// Setup our timestamp and value decoders
dec := NewTimeDecoder(tb)
vdec := NewBoolDecoder(vb)
// Decode both a timestamp and value
var a []Value
for dec.Next() && vdec.Next() {
ts := dec.Read()
v := vdec.Read()
a = append(a, &BoolValue{ts, v})
}
// Did timestamp decoding have an error?
if dec.Error() != nil {
return nil, dec.Error()
}
// Did bool decoding have an error?
if vdec.Error() != nil {
return nil, vdec.Error()
}
return a, nil
}
type Int64Value struct {
time time.Time
value int64
}
func (v *Int64Value) Time() time.Time {
return v.time
}
func (v *Int64Value) Value() interface{} {
return v.value
}
func (f *Int64Value) UnixNano() int64 {
return f.time.UnixNano()
}
func (v *Int64Value) Size() int {
return 16
}
func (v *Int64Value) String() string { return fmt.Sprintf("%v", v.value) }
func encodeInt64Block(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()
vEnc := NewInt64Encoder()
for _, v := range values {
tsEnc.Write(v.Time())
vEnc.Write(v.(*Int64Value).value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return nil, err
}
// Encoded int64 values
vb, err := vEnc.Bytes()
if err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes
block := packBlockHeader(values[0].Time(), BlockInt64)
return append(block, packBlock(tb, vb)...), nil
}
func decodeInt64Block(block []byte) ([]Value, error) {
// slice off the first 8 bytes (min timestmap for the block)
block = block[8:]
blockType := block[0]
if blockType != BlockInt64 {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInt64, blockType)
}
block = block[1:]
// The first 8 bytes is the minimum timestamp of the block
tb, vb := unpackBlock(block)
// Setup our timestamp and value decoders
tsDec := NewTimeDecoder(tb)
vDec := NewInt64Decoder(vb)
// Decode both a timestamp and value
var a []Value
for tsDec.Next() && vDec.Next() {
ts := tsDec.Read()
v := vDec.Read()
a = append(a, &Int64Value{ts, v})
}
// Did timestamp decoding have an error?
if tsDec.Error() != nil {
return nil, tsDec.Error()
}
// Did int64 decoding have an error?
if vDec.Error() != nil {
return nil, vDec.Error()
}
return a, nil
}
type StringValue struct {
time time.Time
value string
}
func (v *StringValue) Time() time.Time {
return v.time
}
func (v *StringValue) Value() interface{} {
return v.value
}
func (v *StringValue) UnixNano() int64 {
return v.time.UnixNano()
}
func (v *StringValue) Size() int {
return 8 + len(v.value)
}
func (v *StringValue) String() string { return v.value }
func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()
vEnc := NewStringEncoder()
for _, v := range values {
tsEnc.Write(v.Time())
vEnc.Write(v.(*StringValue).value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return nil, err
}
// Encoded string values
vb, err := vEnc.Bytes()
if err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes
block := packBlockHeader(values[0].Time(), BlockString)
return append(block, packBlock(tb, vb)...), nil
}
func decodeStringBlock(block []byte) ([]Value, error) {
// slice off the first 8 bytes (min timestmap for the block)
block = block[8:]
blockType := block[0]
if blockType != BlockString {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType)
}
block = block[1:]
// The first 8 bytes is the minimum timestamp of the block
tb, vb := unpackBlock(block)
// Setup our timestamp and value decoders
tsDec := NewTimeDecoder(tb)
vDec, err := NewStringDecoder(vb)
if err != nil {
return nil, err
}
// Decode both a timestamp and value
var a []Value
for tsDec.Next() && vDec.Next() {
ts := tsDec.Read()
v := vDec.Read()
a = append(a, &StringValue{ts, v})
}
// Did timestamp decoding have an error?
if tsDec.Error() != nil {
return nil, tsDec.Error()
}
// Did string decoding have an error?
if vDec.Error() != nil {
return nil, vDec.Error()
}
return a, nil
}
func packBlockHeader(firstTime time.Time, blockType byte) []byte {
return append(u64tob(uint64(firstTime.UnixNano())), blockType)
}
func packBlock(ts []byte, values []byte) []byte {
// We encode the length of the timestamp block using a variable byte encoding.
// This allows small byte slices to take up 1 byte while larger ones use 2 or more.
b := make([]byte, 10)
i := binary.PutUvarint(b, uint64(len(ts)))
// block is <len timestamp bytes>, <ts bytes>, <value bytes>
block := append(b[:i], ts...)
// We don't encode the value length because we know it's the rest of the block after
// the timestamp block.
return append(block, values...)
}
func unpackBlock(buf []byte) (ts, values []byte) {
// Unpack the timestamp block length
tsLen, i := binary.Uvarint(buf)
// Unpack the timestamp bytes
ts = buf[int(i) : int(i)+int(tsLen)]
// Unpack the value bytes
values = buf[int(i)+int(tsLen):]
return
}
// ZigZagEncode converts a int64 to a uint64 by zig zagging negative and positive values
// across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]
func ZigZagEncode(x int64) uint64 {
return uint64(uint64(x<<1) ^ uint64((int64(x) >> 63)))
}
// ZigZagDecode converts a previously zigzag encoded uint64 back to a int64
func ZigZagDecode(v uint64) int64 {
return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63))
}

View File

@ -0,0 +1,158 @@
package tsm1_test
import (
// "math/rand"
"fmt"
"reflect"
"testing"
"time"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func TestEncoding_FloatBlock(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make(tsm1.Values, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, float64(i))
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
values := make(tsm1.Values, 3)
for i := 0; i < 3; i++ {
values[i] = tsm1.NewValue(time.Unix(0, 0), float64(i))
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_IntBlock_Basic(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make(tsm1.Values, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, int64(i))
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if len(decodedValues) != len(values) {
t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
}
for i := 0; i < len(decodedValues); i++ {
if decodedValues[i].Time() != values[i].Time() {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Time(), values[i].Time())
}
if decodedValues[i].Value() != values[i].Value() {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
}
}
}
func TestEncoding_IntBlock_Negatives(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make(tsm1.Values, len(times))
for i, t := range times {
v := int64(i)
if i%2 == 0 {
v = -v
}
values[i] = tsm1.NewValue(t, int64(v))
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_BoolBlock_Basic(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make(tsm1.Values, len(times))
for i, t := range times {
v := true
if i%2 == 0 {
v = false
}
values[i] = tsm1.NewValue(t, v)
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_StringBlock_Basic(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make(tsm1.Values, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
}
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func getTimes(n, step int, precision time.Duration) []time.Time {
t := time.Now().Round(precision)
a := make([]time.Time, n)
for i := 0; i < n; i++ {
a[i] = t.Add(time.Duration(i*60) * precision)
}
return a
}

210
tsdb/engine/tsm1/float.go Normal file
View File

@ -0,0 +1,210 @@
package tsm1
/*
This code is originally from: https://github.com/dgryski/go-tsz and has been modified to remove
the timestamp compression fuctionality.
It implements the float compression as presented in: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
This implementation uses a sentinel value of NaN which means that float64 NaN cannot be stored using
this version.
*/
import (
"bytes"
"math"
"github.com/dgryski/go-bits"
"github.com/dgryski/go-bitstream"
)
const (
// floatUncompressed is an uncompressed format using 8 bytes per value
floatUncompressed = 0
// floatCompressedGorilla is a compressed format using the gorilla paper encoding
floatCompressedGorilla = 1
)
// FloatEncoder encodes multiple float64s into a byte slice
type FloatEncoder struct {
val float64
leading uint64
trailing uint64
buf bytes.Buffer
bw *bitstream.BitWriter
first bool
finished bool
}
func NewFloatEncoder() *FloatEncoder {
s := FloatEncoder{
first: true,
leading: ^uint64(0),
}
s.bw = bitstream.NewWriter(&s.buf)
return &s
}
func (s *FloatEncoder) Bytes() []byte {
return append([]byte{floatCompressedGorilla << 4}, s.buf.Bytes()...)
}
func (s *FloatEncoder) Finish() {
if !s.finished {
// // write an end-of-stream record
s.Push(math.NaN())
s.bw.Flush(bitstream.Zero)
s.finished = true
}
}
func (s *FloatEncoder) Push(v float64) {
if s.first {
// first point
s.val = v
s.first = false
s.bw.WriteBits(math.Float64bits(v), 64)
return
}
vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)
if vDelta == 0 {
s.bw.WriteBit(bitstream.Zero)
} else {
s.bw.WriteBit(bitstream.One)
leading := bits.Clz(vDelta)
trailing := bits.Ctz(vDelta)
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
if s.leading != ^uint64(0) && leading >= s.leading && trailing >= s.trailing {
s.bw.WriteBit(bitstream.Zero)
s.bw.WriteBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
} else {
s.leading, s.trailing = leading, trailing
s.bw.WriteBit(bitstream.One)
s.bw.WriteBits(leading, 5)
sigbits := 64 - leading - trailing
s.bw.WriteBits(sigbits, 6)
s.bw.WriteBits(vDelta>>trailing, int(sigbits))
}
}
s.val = v
}
// FloatDecoder decodes a byte slice into multipe float64 values
type FloatDecoder struct {
val float64
leading uint64
trailing uint64
br *bitstream.BitReader
b []byte
first bool
finished bool
err error
}
func NewFloatDecoder(b []byte) (*FloatDecoder, error) {
// first byte is the compression type but we currently just have gorilla
// compression
br := bitstream.NewReader(bytes.NewReader(b[1:]))
v, err := br.ReadBits(64)
if err != nil {
return nil, err
}
return &FloatDecoder{
val: math.Float64frombits(v),
first: true,
br: br,
b: b,
}, nil
}
func (it *FloatDecoder) Next() bool {
if it.err != nil || it.finished {
return false
}
if it.first {
it.first = false
return true
}
// read compressed value
bit, err := it.br.ReadBit()
if err != nil {
it.err = err
return false
}
if bit == bitstream.Zero {
// it.val = it.val
} else {
bit, err := it.br.ReadBit()
if err != nil {
it.err = err
return false
}
if bit == bitstream.Zero {
// reuse leading/trailing zero bits
// it.leading, it.trailing = it.leading, it.trailing
} else {
bits, err := it.br.ReadBits(5)
if err != nil {
it.err = err
return false
}
it.leading = bits
bits, err = it.br.ReadBits(6)
if err != nil {
it.err = err
return false
}
mbits := bits
it.trailing = 64 - it.leading - mbits
}
mbits := int(64 - it.leading - it.trailing)
bits, err := it.br.ReadBits(mbits)
if err != nil {
it.err = err
return false
}
vbits := math.Float64bits(it.val)
vbits ^= (bits << it.trailing)
val := math.Float64frombits(vbits)
if math.IsNaN(val) {
it.finished = true
return false
}
it.val = val
}
return true
}
func (it *FloatDecoder) Values() float64 {
return it.val
}
func (it *FloatDecoder) Error() error {
return it.err
}

View File

@ -0,0 +1,165 @@
package tsm1_test
import (
"testing"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func TestFloatEncoder_Simple(t *testing.T) {
// Example from the paper
s := tsm1.NewFloatEncoder()
s.Push(12)
s.Push(12)
s.Push(24)
// extra tests
// floating point masking/shifting bug
s.Push(13)
s.Push(24)
// delta-of-delta sizes
s.Push(24)
s.Push(24)
s.Push(24)
s.Finish()
b := s.Bytes()
it, err := tsm1.NewFloatDecoder(b)
if err != nil {
t.Fatalf("unexpected error creating float decoder: %v", err)
}
want := []float64{
12,
12,
24,
13,
24,
24,
24,
24,
}
for _, w := range want {
if !it.Next() {
t.Fatalf("Next()=false, want true")
}
vv := it.Values()
if w != vv {
t.Errorf("Values()=(%v), want (%v)\n", vv, w)
}
}
if it.Next() {
t.Fatalf("Next()=true, want false")
}
if err := it.Error(); err != nil {
t.Errorf("it.Error()=%v, want nil", err)
}
}
var TwoHoursData = []struct {
v float64
}{
// 2h of data
{761}, {727}, {763}, {706}, {700},
{679}, {757}, {708}, {739}, {707},
{699}, {740}, {729}, {766}, {730},
{715}, {705}, {693}, {765}, {724},
{799}, {761}, {737}, {766}, {756},
{719}, {722}, {801}, {747}, {731},
{742}, {744}, {791}, {750}, {759},
{809}, {751}, {705}, {770}, {792},
{727}, {762}, {772}, {721}, {748},
{753}, {744}, {716}, {776}, {659},
{789}, {766}, {758}, {690}, {795},
{770}, {758}, {723}, {767}, {765},
{693}, {706}, {681}, {727}, {724},
{780}, {678}, {696}, {758}, {740},
{735}, {700}, {742}, {747}, {752},
{734}, {743}, {732}, {746}, {770},
{780}, {710}, {731}, {712}, {712},
{741}, {770}, {770}, {754}, {718},
{670}, {775}, {749}, {795}, {756},
{741}, {787}, {721}, {745}, {782},
{765}, {780}, {811}, {790}, {836},
{743}, {858}, {739}, {762}, {770},
{752}, {763}, {795}, {792}, {746},
{786}, {785}, {774}, {786}, {718},
}
func TestFloatEncoder_Roundtrip(t *testing.T) {
s := tsm1.NewFloatEncoder()
for _, p := range TwoHoursData {
s.Push(p.v)
}
s.Finish()
b := s.Bytes()
it, err := tsm1.NewFloatDecoder(b)
if err != nil {
t.Fatalf("unexpected error creating float decoder: %v", err)
}
for _, w := range TwoHoursData {
if !it.Next() {
t.Fatalf("Next()=false, want true")
}
vv := it.Values()
// t.Logf("it.Values()=(%+v, %+v)\n", time.Unix(int64(tt), 0), vv)
if w.v != vv {
t.Errorf("Values()=(%v), want (%v)\n", vv, w.v)
}
}
if it.Next() {
t.Fatalf("Next()=true, want false")
}
if err := it.Error(); err != nil {
t.Errorf("it.Error()=%v, want nil", err)
}
}
func BenchmarkFloatEncoder(b *testing.B) {
for i := 0; i < b.N; i++ {
s := tsm1.NewFloatEncoder()
for _, tt := range TwoHoursData {
s.Push(tt.v)
}
s.Finish()
}
}
func BenchmarkFloatDecoder(b *testing.B) {
s := tsm1.NewFloatEncoder()
for _, tt := range TwoHoursData {
s.Push(tt.v)
}
s.Finish()
bytes := s.Bytes()
b.ResetTimer()
for i := 0; i < b.N; i++ {
it, err := tsm1.NewFloatDecoder(bytes)
if err != nil {
b.Fatalf("unexpected error creating float decoder: %v", err)
}
for j := 0; j < len(TwoHoursData); it.Next() {
j++
}
}
}

180
tsdb/engine/tsm1/int.go Normal file
View File

@ -0,0 +1,180 @@
package tsm1
// Int64 encoding uses two different strategies depending on the range of values in
// the uncompressed data. Encoded values are first encoding used zig zag encoding.
// This interleaves postiive and negative integers across a range of positive integers.
//
// For example, [-2,-1,0,1] becomes [3,1,0,2]. See
// https://developers.google.com/protocol-buffers/docs/encoding?hl=en#signed-integers
// for more information.
//
// If all the zig zag encoded values less than 1 << 60 - 1, they are compressed using
// simple8b encoding. If any values is larger than 1 << 60 - 1, the values are stored uncompressed.
//
// Each encoded byte slice, contains a 1 byte header followed by multiple 8 byte packed integers
// or 8 byte uncompressed integers. The 4 high bits of the first byte indicate the encoding type
// for the remaining bytes.
//
// There are currently two encoding types that can be used with room for 16 total. These additional
// encoding slots are reserved for future use. One improvement to be made is to use a patched
// encoding such as PFOR if only a small number of values exceed the max compressed value range. This
// should improve compression ratios with very large integers near the ends of the int64 range.
import (
"encoding/binary"
"fmt"
"github.com/jwilder/encoding/simple8b"
)
const (
// intUncompressed is an uncompressed format using 8 bytes per point
intUncompressed = 0
// intCompressedSimple is a bit-packed format using simple8b encoding
intCompressedSimple = 1
)
// Int64Encoder encoders int64 into byte slices
type Int64Encoder interface {
Write(v int64)
Bytes() ([]byte, error)
}
// Int64Decoder decodes a byte slice into int64s
type Int64Decoder interface {
Next() bool
Read() int64
Error() error
}
type int64Encoder struct {
values []uint64
}
func NewInt64Encoder() Int64Encoder {
return &int64Encoder{}
}
func (e *int64Encoder) Write(v int64) {
e.values = append(e.values, ZigZagEncode(v))
}
func (e *int64Encoder) Bytes() ([]byte, error) {
for _, v := range e.values {
// Value is too large to encode using packed format
if v > simple8b.MaxValue {
return e.encodeUncompressed()
}
}
return e.encodePacked()
}
func (e *int64Encoder) encodePacked() ([]byte, error) {
encoded, err := simple8b.EncodeAll(e.values)
if err != nil {
return nil, err
}
b := make([]byte, 1+len(encoded)*8)
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(intCompressedSimple) << 4
for i, v := range encoded {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v)
}
return b, nil
}
func (e *int64Encoder) encodeUncompressed() ([]byte, error) {
b := make([]byte, 1+len(e.values)*8)
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(intUncompressed) << 4
for i, v := range e.values {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v)
}
return b, nil
}
type int64Decoder struct {
values []uint64
bytes []byte
i int
n int
encoding byte
err error
}
func NewInt64Decoder(b []byte) Int64Decoder {
d := &int64Decoder{
// 240 is the maximum number of values that can be encoded into a single uint64 using simple8b
values: make([]uint64, 240),
}
d.SetBytes(b)
return d
}
func (d *int64Decoder) SetBytes(b []byte) {
if len(b) > 0 {
d.encoding = b[0] >> 4
d.bytes = b[1:]
}
d.i = 0
d.n = 0
}
func (d *int64Decoder) Next() bool {
if d.i >= d.n && len(d.bytes) == 0 {
return false
}
d.i += 1
if d.i >= d.n {
switch d.encoding {
case intUncompressed:
d.decodeUncompressed()
case intCompressedSimple:
d.decodePacked()
default:
d.err = fmt.Errorf("unknown encoding %v", d.encoding)
}
}
return d.i < d.n
}
func (d *int64Decoder) Error() error {
return d.err
}
func (d *int64Decoder) Read() int64 {
return ZigZagDecode(d.values[d.i])
}
func (d *int64Decoder) decodePacked() {
if len(d.bytes) == 0 {
return
}
v := binary.BigEndian.Uint64(d.bytes[0:8])
n, err := simple8b.Decode(d.values, v)
if err != nil {
// Should never happen, only error that could be returned is if the the value to be decoded was not
// actually encoded by simple8b encoder.
d.err = fmt.Errorf("failed to decode value %v: %v", v, err)
}
d.n = n
d.i = 0
d.bytes = d.bytes[8:]
}
func (d *int64Decoder) decodeUncompressed() {
d.values[0] = binary.BigEndian.Uint64(d.bytes[0:8])
d.i = 0
d.n = 1
d.bytes = d.bytes[8:]
}

View File

@ -0,0 +1,249 @@
package tsm1_test
import (
"math"
"testing"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func Test_Int64Encoder_NoValues(t *testing.T) {
enc := tsm1.NewInt64Encoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_Int64Encoder_One(t *testing.T) {
enc := tsm1.NewInt64Encoder()
v1 := int64(1)
enc.Write(1)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
}
func Test_Int64Encoder_Two(t *testing.T) {
enc := tsm1.NewInt64Encoder()
var v1, v2 int64 = 1, 2
enc.Write(v1)
enc.Write(v2)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
}
func Test_Int64Encoder_Negative(t *testing.T) {
enc := tsm1.NewInt64Encoder()
var v1, v2, v3 int64 = -2, 0, 1
enc.Write(v1)
enc.Write(v2)
enc.Write(v3)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
}
}
func Test_Int64Encoder_Large_Range(t *testing.T) {
enc := tsm1.NewInt64Encoder()
var v1, v2 int64 = math.MinInt64, math.MaxInt64
enc.Write(v1)
enc.Write(v2)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
}
func Test_Int64Encoder_Uncompressed(t *testing.T) {
enc := tsm1.NewInt64Encoder()
var v1, v2, v3 int64 = 0, 1, 1 << 60
enc.Write(v1)
enc.Write(v2)
enc.Write(v3)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("expected error: %v", err)
}
// 1 byte header + 3 * 8 byte values
if exp := 25; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
dec := tsm1.NewInt64Decoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
}
}
func Test_Int64Encoder_AllNegative(t *testing.T) {
enc := tsm1.NewInt64Encoder()
values := []int64{
-10, -5, -1,
}
for _, v := range values {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewInt64Decoder(b)
i := 0
for dec.Next() {
if i > len(values) {
t.Fatalf("read too many values: got %v, exp %v", i, len(values))
}
if values[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i])
}
i += 1
}
}
func BenchmarkInt64Encoder(b *testing.B) {
enc := tsm1.NewInt64Encoder()
x := make([]int64, 1024)
for i := 0; i < len(x); i++ {
x[i] = int64(i)
enc.Write(x[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
enc.Bytes()
}
}
type byteSetter interface {
SetBytes(b []byte)
}
func BenchmarkInt64Decoder(b *testing.B) {
x := make([]int64, 1024)
enc := tsm1.NewInt64Encoder()
for i := 0; i < len(x); i++ {
x[i] = int64(i)
enc.Write(x[i])
}
bytes, _ := enc.Bytes()
b.ResetTimer()
dec := tsm1.NewInt64Decoder(bytes)
for i := 0; i < b.N; i++ {
dec.(byteSetter).SetBytes(bytes)
for dec.Next() {
}
}
}

View File

@ -0,0 +1,94 @@
package tsm1
// String encoding uses snappy compression to compress each string. Each string is
// appended to byte slice prefixed with a variable byte length followed by the string
// bytes. The bytes are compressed using snappy compressor and a 1 byte header is used
// to indicate the type of encoding.
import (
"encoding/binary"
"fmt"
"github.com/golang/snappy"
)
const (
// stringUncompressed is a an uncompressed format encoding strings as raw bytes
stringUncompressed = 0
// stringCompressedSnappy is a compressed encoding using Snappy compression
stringCompressedSnappy = 1
)
type StringEncoder interface {
Write(s string)
Bytes() ([]byte, error)
}
type StringDecoder interface {
Next() bool
Read() string
Error() error
}
type stringEncoder struct {
// The encoded bytes
bytes []byte
}
func NewStringEncoder() StringEncoder {
return &stringEncoder{}
}
func (e *stringEncoder) Write(s string) {
b := make([]byte, 10)
// Append the length of the string using variable byte encoding
i := binary.PutUvarint(b, uint64(len(s)))
e.bytes = append(e.bytes, b[:i]...)
// Append the string bytes
e.bytes = append(e.bytes, s...)
}
func (e *stringEncoder) Bytes() ([]byte, error) {
// Compress the currently appended bytes using snappy and prefix with
// a 1 byte header for future extension
data := snappy.Encode(nil, e.bytes)
return append([]byte{stringCompressedSnappy << 4}, data...), nil
}
type stringDecoder struct {
b []byte
l int
i int
err error
}
func NewStringDecoder(b []byte) (StringDecoder, error) {
// First byte stores the encoding type, only have snappy format
// currently so ignore for now.
data, err := snappy.Decode(nil, b[1:])
if err != nil {
return nil, fmt.Errorf("failed to decode string block: %v", err.Error())
}
return &stringDecoder{b: data}, nil
}
func (e *stringDecoder) Next() bool {
e.i += e.l
return e.i < len(e.b)
}
func (e *stringDecoder) Read() string {
// Read the length of the string
length, n := binary.Uvarint(e.b[e.i:])
// The length of this string plus the length of the variable byte encoded length
e.l = int(length) + n
return string(e.b[e.i+n : e.i+n+int(length)])
}
func (e *stringDecoder) Error() error {
return e.err
}

View File

@ -0,0 +1,85 @@
package tsm1
import (
"fmt"
"testing"
)
func Test_StringEncoder_NoValues(t *testing.T) {
enc := NewStringEncoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec, err := NewStringDecoder(b)
if err != nil {
t.Fatalf("unexpected erorr creating string decoder: %v", err)
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_StringEncoder_Single(t *testing.T) {
enc := NewStringEncoder()
v1 := "v1"
enc.Write(v1)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec, err := NewStringDecoder(b)
if err != nil {
t.Fatalf("unexpected erorr creating string decoder: %v", err)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v1 != dec.Read() {
t.Fatalf("unexpected value: got %v, exp %v", dec.Read(), v1)
}
}
func Test_StringEncoder_Multi_Compressed(t *testing.T) {
enc := NewStringEncoder()
values := make([]string, 10)
for i := range values {
values[i] = fmt.Sprintf("value %d", i)
enc.Write(values[i])
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != stringCompressedSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy)
}
if exp := 47; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
dec, err := NewStringDecoder(b)
if err != nil {
t.Fatalf("unexpected erorr creating string decoder: %v", err)
}
for i, v := range values {
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v != dec.Read() {
t.Fatalf("unexpected value at pos %d: got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}

View File

@ -0,0 +1,309 @@
package tsm1
// Timestamp encoding is adaptive and based on structure of the timestamps that are encoded. It
// uses a combination of delta encoding, scaling and compression using simple8b, run length encoding
// as well as falling back to no compression if needed.
//
// Timestamp values to be encoded should be sorted before encoding. When encoded, the values are
// first delta-encoded. The first value is the starting timestamp, subsequent values are the difference.
// from the prior value.
//
// Timestamp resolution can also be in the nanosecond. Many timestamps are monotonically increasing
// and fall on even boundaries of time such as every 10s. When the timestamps have this structure,
// they are scaled by the largest common divisor that is also a factor of 10. This has the effect
// of converting very large integer deltas into very small one that can be reversed by multiplying them
// by the scaling factor.
//
// Using these adjusted values, if all the deltas are the same, the time range is stored using run
// length encoding. If run length encoding is not possible and all values are less than 1 << 60 - 1
// (~36.5 yrs in nanosecond resolution), then the timestamps are encoded using simple8b encoding. If
// any value exceeds the maximum values, the deltas are stored uncompressed using 8b each.
//
// Each compressed byte slice has a 1 byte header indicating the compression type. The 4 high bits
// indicated the encoding type. The 4 low bits are used by the encoding type.
//
// For run-length encoding, the 4 low bits store the log10 of the scaling factor. The next 8 bytes are
// the starting timestamp, next 1-10 bytes is the delta value using variable-length encoding, finally the
// next 1-10 bytes is the count of values.
//
// For simple8b encoding, the 4 low bits store the log10 of the scaling factor. The next 8 bytes is the
// first delta value stored uncompressed, the remaining bytes are 64bit words containg compressed delta
// values.
//
// For uncompressed encoding, the delta values are stored using 8 bytes each.
import (
"encoding/binary"
"fmt"
"math"
"time"
"github.com/jwilder/encoding/simple8b"
)
const (
// timeUncompressed is a an uncompressed format using 8 bytes per timestamp
timeUncompressed = 0
// timeCompressedPackedSimple is a bit-packed format using simple8b encoding
timeCompressedPackedSimple = 1
// timeCompressedRLE is a run-length encoding format
timeCompressedRLE = 2
)
// TimeEncoder encodes time.Time to byte slices.
type TimeEncoder interface {
Write(t time.Time)
Bytes() ([]byte, error)
}
// TimeEncoder decodes byte slices to time.Time values.
type TimeDecoder interface {
Next() bool
Read() time.Time
Error() error
}
type encoder struct {
ts []uint64
}
// NewTimeEncoder returns a TimeEncoder
func NewTimeEncoder() TimeEncoder {
return &encoder{}
}
// Write adds a time.Time to the compressed stream.
func (e *encoder) Write(t time.Time) {
e.ts = append(e.ts, uint64(t.UnixNano()))
}
func (e *encoder) reduce() (max, divisor uint64, rle bool, deltas []uint64) {
// Compute the deltas in place to avoid allocating another slice
deltas = e.ts
// Starting values for a max and divisor
max, divisor = 0, 1e12
// Indicates whether the the deltas can be run-length encoded
rle = true
// Iterate in reverse so we can apply deltas in place
for i := len(deltas) - 1; i > 0; i-- {
// First differential encode the values
deltas[i] = deltas[i] - deltas[i-1]
// We also need to keep track of the max value and largest common divisor
v := deltas[i]
if v > max {
max = v
}
for {
// If our value is divisible by 10, break. Otherwise, try the next smallest divisor.
if v%divisor == 0 {
break
}
divisor /= 10
}
// Skip the first value || see if prev = curr. The deltas can be RLE if the are all equal.
rle = i == len(deltas)-1 || rle && (deltas[i+1] == deltas[i])
}
return
}
// Bytes returns the encoded bytes of all written times.
func (e *encoder) Bytes() ([]byte, error) {
if len(e.ts) == 0 {
return []byte{}, nil
}
// Maximum and largest common divisor. rle is true if dts (the delta timestamps),
// are all the same.
max, div, rle, dts := e.reduce()
// The deltas are all the same, so we can run-length encode them
if rle && len(e.ts) > 60 {
return e.encodeRLE(e.ts[0], e.ts[1], div, len(e.ts))
}
// We can't compress this time-range, the deltas exceed 1 << 60
if max > simple8b.MaxValue {
return e.encodeRaw()
}
return e.encodePacked(div, dts)
}
func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) {
enc := simple8b.NewEncoder()
for _, v := range dts[1:] {
enc.Write(uint64(v) / div)
}
b := make([]byte, 8+1)
// 4 high bits used for the encoding type
b[0] = byte(timeCompressedPackedSimple) << 4
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
// The first delta value
binary.BigEndian.PutUint64(b[1:9], uint64(dts[0]))
// The compressed deltas
deltas, err := enc.Bytes()
if err != nil {
return nil, err
}
return append(b, deltas...), nil
}
func (e *encoder) encodeRaw() ([]byte, error) {
b := make([]byte, 1+len(e.ts)*8)
b[0] = byte(timeUncompressed) << 4
for i, v := range e.ts {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
}
return b, nil
}
func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) {
// Large varints can take up to 10 bytes
b := make([]byte, 1+10*3)
// 4 high bits used for the encoding type
b[0] = byte(timeCompressedRLE) << 4
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
i := 1
// The first timestamp
binary.BigEndian.PutUint64(b[i:], uint64(first))
i += 8
// The first delta
i += binary.PutUvarint(b[i:], uint64(delta/div))
// The number of times the delta is repeated
i += binary.PutUvarint(b[i:], uint64(n))
return b[:i], nil
}
type decoder struct {
v time.Time
ts []uint64
err error
}
func NewTimeDecoder(b []byte) TimeDecoder {
d := &decoder{}
d.decode(b)
return d
}
func (d *decoder) Next() bool {
if len(d.ts) == 0 {
return false
}
d.v = time.Unix(0, int64(d.ts[0]))
d.ts = d.ts[1:]
return true
}
func (d *decoder) Read() time.Time {
return d.v
}
func (d *decoder) Error() error {
return d.err
}
func (d *decoder) decode(b []byte) {
if len(b) == 0 {
return
}
// Encoding type is stored in the 4 high bits of the first byte
encoding := b[0] >> 4
switch encoding {
case timeUncompressed:
d.decodeRaw(b[1:])
case timeCompressedRLE:
d.decodeRLE(b)
case timeCompressedPackedSimple:
d.decodePacked(b)
default:
d.err = fmt.Errorf("unknown encoding: %v", encoding)
}
}
func (d *decoder) decodePacked(b []byte) {
div := uint64(math.Pow10(int(b[0] & 0xF)))
first := uint64(binary.BigEndian.Uint64(b[1:9]))
enc := simple8b.NewDecoder(b[9:])
deltas := []uint64{first}
for enc.Next() {
deltas = append(deltas, enc.Read())
}
// Compute the prefix sum and scale the deltas back up
for i := 1; i < len(deltas); i++ {
dgap := deltas[i] * div
deltas[i] = deltas[i-1] + dgap
}
d.ts = deltas
}
func (d *decoder) decodeRLE(b []byte) {
var i, n int
// Lower 4 bits hold the 10 based exponent so we can scale the values back up
mod := int64(math.Pow10(int(b[i] & 0xF)))
i += 1
// Next 8 bytes is the starting timestamp
first := binary.BigEndian.Uint64(b[i : i+8])
i += 8
// Next 1-10 bytes is our (scaled down by factor of 10) run length values
value, n := binary.Uvarint(b[i:])
// Scale the value back up
value *= uint64(mod)
i += n
// Last 1-10 bytes is how many times the value repeats
count, n := binary.Uvarint(b[i:])
// Rebuild construct the original values now
deltas := make([]uint64, count)
for i := range deltas {
deltas[i] = value
}
// Reverse the delta-encoding
deltas[0] = first
for i := 1; i < len(deltas); i++ {
deltas[i] = deltas[i-1] + deltas[i]
}
d.ts = deltas
}
func (d *decoder) decodeRaw(b []byte) {
d.ts = make([]uint64, len(b)/8)
for i := range d.ts {
d.ts[i] = binary.BigEndian.Uint64(b[i*8 : i*8+8])
delta := d.ts[i]
// Compute the prefix sum and scale the deltas back up
if i > 0 {
d.ts[i] = d.ts[i-1] + delta
}
}
}

View File

@ -0,0 +1,388 @@
package tsm1
import (
"testing"
"time"
)
func Test_TimeEncoder(t *testing.T) {
enc := NewTimeEncoder()
x := []time.Time{}
now := time.Unix(0, 0)
x = append(x, now)
enc.Write(now)
for i := 1; i < 4; i++ {
x = append(x, now.Add(time.Duration(i)*time.Second))
enc.Write(x[i])
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
for i, v := range x {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
}
if v != dec.Read() {
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
}
}
}
func Test_TimeEncoder_NoValues(t *testing.T) {
enc := NewTimeEncoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := NewTimeDecoder(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_TimeEncoder_One(t *testing.T) {
enc := NewTimeEncoder()
tm := time.Unix(0, 0)
enc.Write(tm)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if tm != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), tm)
}
}
func Test_TimeEncoder_Two(t *testing.T) {
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(0, 1)
enc.Write(t1)
enc.Write(t2)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
}
}
func Test_TimeEncoder_Three(t *testing.T) {
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(0, 1)
t3 := time.Unix(0, 2)
enc.Write(t1)
enc.Write(t2)
enc.Write(t3)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t3)
}
}
func Test_TimeEncoder_Large_Range(t *testing.T) {
enc := NewTimeEncoder()
t1 := time.Unix(0, 1442369134000000000)
t2 := time.Unix(0, 1442369135000000000)
enc.Write(t1)
enc.Write(t2)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
}
}
func Test_TimeEncoder_Uncompressed(t *testing.T) {
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(1, 0)
// about 36.5yrs in NS resolution is max range for compressed format
// This should cause the encoding to fallback to raw points
t3 := time.Unix(2, (2 << 59))
enc.Write(t1)
enc.Write(t2)
enc.Write(t3)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("expected error: %v", err)
}
if exp := 25; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if t3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t3)
}
}
func Test_TimeEncoder_RLE(t *testing.T) {
enc := NewTimeEncoder()
var ts []time.Time
for i := 0; i < 500; i++ {
ts = append(ts, time.Unix(int64(i), 0))
}
for _, v := range ts {
enc.Write(v)
}
b, err := enc.Bytes()
if exp := 12; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := NewTimeDecoder(b)
for i, v := range ts {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
}
if v != dec.Read() {
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected extra values")
}
}
func Test_TimeEncoder_Reverse(t *testing.T) {
enc := NewTimeEncoder()
ts := []time.Time{
time.Unix(0, 3),
time.Unix(0, 2),
time.Unix(0, 1),
}
for _, v := range ts {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
i := 0
for dec.Next() {
if ts[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), ts[i])
}
i += 1
}
}
func Test_TimeEncoder_220SecondDelta(t *testing.T) {
enc := NewTimeEncoder()
var ts []time.Time
now := time.Now()
for i := 0; i < 220; i++ {
ts = append(ts, now.Add(time.Duration(i*60)*time.Second))
}
for _, v := range ts {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Using RLE, should get 12 bytes
if exp := 12; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := NewTimeDecoder(b)
i := 0
for dec.Next() {
if ts[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), ts[i])
}
i += 1
}
if i != len(ts) {
t.Fatalf("Read too few values: exp %d, got %d", len(ts), i)
}
if dec.Next() {
t.Fatalf("expecte Next() = false, got true")
}
}
func BenchmarkTimeEncoder(b *testing.B) {
enc := NewTimeEncoder()
x := make([]time.Time, 1024)
for i := 0; i < len(x); i++ {
x[i] = time.Now()
enc.Write(x[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
enc.Bytes()
}
}
func BenchmarkTimeDecoder(b *testing.B) {
x := make([]time.Time, 1024)
enc := NewTimeEncoder()
for i := 0; i < len(x); i++ {
x[i] = time.Now()
enc.Write(x[i])
}
bytes, _ := enc.Bytes()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
dec := NewTimeDecoder(bytes)
b.StartTimer()
for dec.Next() {
}
}
}

1974
tsdb/engine/tsm1/tsm1.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

69
tsdb/engine/tsm1/tx.go Normal file
View File

@ -0,0 +1,69 @@
package tsm1
import (
"io"
"github.com/influxdb/influxdb/tsdb"
)
type tx struct {
files dataFiles
engine *Engine
}
// TODO: handle multiple fields and descending
func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
t.engine.filesLock.RLock()
defer t.engine.filesLock.RUnlock()
// don't add the overhead of the multifield cursor if we only have one field
if len(fields) == 1 {
id := t.engine.keyAndFieldToID(series, fields[0])
_, isDeleted := t.engine.deletes[id]
var indexCursor tsdb.Cursor
if isDeleted {
indexCursor = &emptyCursor{ascending: ascending}
} else {
indexCursor = newCursor(id, t.files, ascending)
}
wc := t.engine.WAL.Cursor(series, fields, dec, ascending)
return NewCombinedEngineCursor(wc, indexCursor, ascending)
}
// multiple fields. use just the MultiFieldCursor, which also handles time collisions
// so we don't need to use the combined cursor
cursors := make([]tsdb.Cursor, 0)
cursorFields := make([]string, 0)
for _, field := range fields {
id := t.engine.keyAndFieldToID(series, field)
_, isDeleted := t.engine.deletes[id]
var indexCursor tsdb.Cursor
if isDeleted {
indexCursor = &emptyCursor{ascending: ascending}
} else {
indexCursor = newCursor(id, t.files, ascending)
}
wc := t.engine.WAL.Cursor(series, []string{field}, dec, ascending)
// double up the fields since there's one for the wal and one for the index
cursorFields = append(cursorFields, field, field)
cursors = append(cursors, indexCursor, wc)
}
return NewMultiFieldCursor(cursorFields, cursors, ascending)
}
func (t *tx) Rollback() error {
t.engine.queryLock.RUnlock()
for _, f := range t.files {
f.mu.RUnlock()
}
return nil
}
// TODO: refactor the Tx interface to not have Size, Commit, or WriteTo since they're not used
func (t *tx) Size() int64 { panic("not implemented") }
func (t *tx) Commit() error { panic("not implemented") }
func (t *tx) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }

792
tsdb/engine/tsm1/wal.go Normal file
View File

@ -0,0 +1,792 @@
package tsm1
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/golang/snappy"
)
const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// FileExtension is the file extension we expect for wal segments
WALFileExtension = "wal"
WALFilePrefix = "_"
writeBufLen = 32 << 10 // 32kb
)
// flushType indiciates why a flush and compaction are being run so the partition can
// do the appropriate type of compaction
type flushType int
const (
// noFlush indicates that no flush or compaction are necesssary at this time
noFlush flushType = iota
// memoryFlush indicates that we should look for the series using the most
// memory to flush out and compact all others
memoryFlush
// idleFlush indicates that we should flush all series in the parition,
// delete all segment files and hold off on opening a new one
idleFlush
// deleteFlush indicates that we're flushing because series need to be removed from the WAL
deleteFlush
// startupFlush indicates that we're flushing because the database is starting up
startupFlush
)
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
type walEntryType byte
const (
pointsEntry walEntryType = 0x01
fieldsEntry walEntryType = 0x02
seriesEntry walEntryType = 0x03
deleteEntry walEntryType = 0x04
)
type Log struct {
path string
flushCheckTimer *time.Timer // check this often to see if a background flush should happen
flushCheckInterval time.Duration
// write variables
writeLock sync.Mutex
currentSegmentID int
currentSegmentFile *os.File
currentSegmentSize int
// cache and flush variables
cacheLock sync.RWMutex
lastWriteTime time.Time
flushRunning bool
cache map[string]Values
cacheDirtySort map[string]bool // this map should be small, only for dirty vals
flushCache map[string]Values // temporary map while flushing
memorySize int
measurementFieldsCache map[string]*tsdb.MeasurementFields
seriesToCreateCache []*tsdb.SeriesCreate
// LogOutput is the writer used by the logger.
LogOutput io.Writer
logger *log.Logger
// FlushColdInterval is the period of time after which a partition will do a
// full flush and compaction if it has been cold for writes.
FlushColdInterval time.Duration
// SegmentSize is the file size at which a segment file will be rotated
SegmentSize int
// FlushMemorySizeThreshold specifies when the log should be forced to be flushed
FlushMemorySizeThreshold int
// MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected
MaxMemorySizeThreshold int
// Index is the database series will be flushed to
Index IndexWriter
// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool
// SkipCache specifies if the wal should immediately write to the index instead of
// caching data in memory. False by default so we buffer in memory before flushing to index.
SkipCache bool
// SkipDurability specifies if the wal should not write the wal entries to disk.
// False by default which means all writes are durable even when cached before flushing to index.
SkipDurability bool
}
// IndexWriter is an interface for the indexed database the WAL flushes data to
type IndexWriter interface {
Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
MarkDeletes(keys []string)
MarkMeasurementDelete(name string)
}
func NewLog(path string) *Log {
return &Log{
path: path,
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
FlushMemorySizeThreshold: tsdb.DefaultFlushMemorySizeThreshold,
MaxMemorySizeThreshold: tsdb.DefaultMaxMemorySizeThreshold,
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
}
}
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (l *Log) Open() error {
if l.LoggingEnabled {
l.logger.Printf("tsm1 WAL starting with %d flush memory size threshold and %d max memory size threshold\n", l.FlushMemorySizeThreshold, l.MaxMemorySizeThreshold)
l.logger.Printf("tsm1 WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
}
l.cache = make(map[string]Values)
l.cacheDirtySort = make(map[string]bool)
l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields)
// flush out any WAL entries that are there from before
if err := l.readAndFlushWAL(); err != nil {
return err
}
return nil
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given.
// This should only ever be called by the engine cursor method, which will always give it
// exactly one field.
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
if len(fields) != 1 {
panic("wal cursor should only ever be called with 1 field")
}
ck := SeriesFieldKey(series, fields[0])
values := l.cache[ck]
// if we're in the middle of a flush, combine the previous cache
// with this one for the cursor
if l.flushCache != nil {
if fc, ok := l.flushCache[ck]; ok {
c := make([]Value, len(fc), len(fc)+len(values))
copy(c, fc)
c = append(c, values...)
return newWALCursor(Values(c).Deduplicate(), ascending)
}
}
if l.cacheDirtySort[ck] {
values = Values(values).Deduplicate()
}
// build a copy so writes afterwards don't change the result set
a := make([]Value, len(values))
copy(a, values)
return newWALCursor(a, ascending)
}
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
// add everything to the cache, or return an error if we've hit our max memory
if addedToCache := l.addToCache(points, fields, series, true); !addedToCache {
return fmt.Errorf("WAL backed up flushing to index, hit max memory")
}
// make the write durable if specified
if !l.SkipDurability {
// write the points
pointStrings := make([]string, len(points))
for i, p := range points {
pointStrings[i] = p.String()
}
data := strings.Join(pointStrings, "\n")
compressed := snappy.Encode(nil, []byte(data))
if err := l.writeToLog(pointsEntry, compressed); err != nil {
return err
}
// write the new fields
if len(fields) > 0 {
data, err := json.Marshal(fields)
if err != nil {
return err
}
compressed = snappy.Encode(compressed, data)
if err := l.writeToLog(fieldsEntry, compressed); err != nil {
return err
}
}
// write the new series
if len(series) > 0 {
data, err := json.Marshal(series)
if err != nil {
return err
}
compressed = snappy.Encode(compressed, data)
if err := l.writeToLog(seriesEntry, compressed); err != nil {
return err
}
}
}
// usually skipping the cache is only for testing purposes and this was the easiest
// way to represent the logic (to cache and then immediately flush)
if l.SkipCache {
l.flush(idleFlush)
}
return nil
}
// addToCache will add the points, measurements, and fields to the cache and return true if successful. They will be queryable
// immediately after return and will be flushed at the next flush cycle. Before adding to the cache we check if we're over the
// max memory threshold. If we are we request a flush in a new goroutine and return false, indicating we didn't add the values
// to the cache and that writes should return a failure.
func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate, checkMemory bool) bool {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
// if we should check memory and we're over the threshold, mark a flush as running and kick one off in a goroutine
if checkMemory && l.memorySize > l.FlushMemorySizeThreshold {
if !l.flushRunning {
l.flushRunning = true
go l.flush(memoryFlush)
}
if l.memorySize > l.MaxMemorySizeThreshold {
return false
}
}
for _, p := range points {
for name, value := range p.Fields() {
k := SeriesFieldKey(string(p.Key()), name)
v := NewValue(p.Time(), value)
cacheValues := l.cache[k]
// only mark it as dirty if it isn't already
if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 {
dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() >= v.Time().UnixNano()
if dirty {
l.cacheDirtySort[k] = true
}
}
l.memorySize += v.Size()
l.cache[k] = append(cacheValues, v)
}
}
for k, v := range fields {
l.measurementFieldsCache[k] = v
}
l.seriesToCreateCache = append(l.seriesToCreateCache, series...)
l.lastWriteTime = time.Now()
return true
}
func (l *Log) LastWriteTime() time.Time {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
return l.lastWriteTime
}
// readAndFlushWAL is called on open and will read the segment files in, flushing whenever
// the memory gets over the limit. Once all files have been read it will flush and remove the files
func (l *Log) readAndFlushWAL() error {
files, err := l.segmentFileNames()
if err != nil {
return err
}
// read all the segment files and cache them, flushing along the way if we
// hit memory limits
for _, fn := range files {
if err := l.readFileToCache(fn); err != nil {
return err
}
if l.memorySize > l.MaxMemorySizeThreshold {
if err := l.flush(memoryFlush); err != nil {
return err
}
}
}
// now flush and remove all the old files
if err := l.flush(startupFlush); err != nil {
return err
}
return nil
}
func (l *Log) readFileToCache(fileName string) error {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0666)
if err != nil {
return err
}
defer f.Close()
buf := make([]byte, writeBufLen)
data := make([]byte, writeBufLen)
for {
// read the type and the length of the entry
_, err := io.ReadFull(f, buf[0:5])
if err == io.EOF {
return nil
} else if err != nil {
l.logger.Printf("error reading segment file %s: %s", fileName, err.Error())
return err
}
entryType := buf[0]
length := btou32(buf[1:5])
// read the compressed block and decompress it
if int(length) > len(buf) {
buf = make([]byte, length)
}
_, err = io.ReadFull(f, buf[0:length])
if err == io.EOF || err == io.ErrUnexpectedEOF {
l.logger.Printf("hit end of file while reading compressed wal entry from %s", fileName)
return nil
} else if err != nil {
return err
}
data, err = snappy.Decode(data, buf[0:length])
if err != nil {
l.logger.Printf("error decoding compressed entry from %s: %s", fileName, err.Error())
return nil
}
// and marshal it and send it to the cache
switch walEntryType(entryType) {
case pointsEntry:
points, err := models.ParsePoints(data)
if err != nil {
return err
}
l.addToCache(points, nil, nil, false)
case fieldsEntry:
fields := make(map[string]*tsdb.MeasurementFields)
if err := json.Unmarshal(data, &fields); err != nil {
return err
}
l.addToCache(nil, fields, nil, false)
case seriesEntry:
series := make([]*tsdb.SeriesCreate, 0)
if err := json.Unmarshal(data, &series); err != nil {
return err
}
l.addToCache(nil, nil, series, false)
case deleteEntry:
d := &deleteData{}
if err := json.Unmarshal(data, &d); err != nil {
return err
}
l.Index.MarkDeletes(d.Keys)
l.Index.MarkMeasurementDelete(d.MeasurementName)
l.deleteKeysFromCache(d.Keys)
if d.MeasurementName != "" {
l.deleteMeasurementFromCache(d.MeasurementName)
}
}
}
}
func (l *Log) writeToLog(writeType walEntryType, data []byte) error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.currentSegmentFile == nil || l.currentSegmentSize > DefaultSegmentSize {
if err := l.newSegmentFile(); err != nil {
// fail hard since we can't write data
panic(fmt.Sprintf("error opening new segment file for wal: %s", err.Error()))
}
}
// The panics here are an intentional choice. Based on reports from users
// it's better to fail hard if the database can't take writes. Then they'll
// get alerted and fix whatever is broken. Remove these and face Paul's wrath.
if _, err := l.currentSegmentFile.Write([]byte{byte(writeType)}); err != nil {
panic(fmt.Sprintf("error writing type to wal: %s", err.Error()))
}
if _, err := l.currentSegmentFile.Write(u32tob(uint32(len(data)))); err != nil {
panic(fmt.Sprintf("error writing len to wal: %s", err.Error()))
}
if _, err := l.currentSegmentFile.Write(data); err != nil {
panic(fmt.Sprintf("error writing data to wal: %s", err.Error()))
}
l.currentSegmentSize += 5 + len(data)
return l.currentSegmentFile.Sync()
}
// Flush will force a flush of the WAL to the index
func (l *Log) Flush() error {
return l.flush(idleFlush)
}
func (l *Log) DeleteMeasurement(measurement string, keys []string) error {
d := &deleteData{MeasurementName: measurement, Keys: keys}
err := l.writeDeleteEntry(d)
if err != nil {
return err
}
l.deleteKeysFromCache(keys)
l.deleteMeasurementFromCache(measurement)
return nil
}
func (l *Log) deleteMeasurementFromCache(name string) {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
delete(l.measurementFieldsCache, name)
}
func (l *Log) writeDeleteEntry(d *deleteData) error {
js, err := json.Marshal(d)
if err != nil {
return err
}
data := snappy.Encode(nil, js)
return l.writeToLog(deleteEntry, data)
}
func (l *Log) DeleteSeries(keys []string) error {
l.deleteKeysFromCache(keys)
return l.writeDeleteEntry(&deleteData{Keys: keys})
}
func (l *Log) deleteKeysFromCache(keys []string) {
seriesKeys := make(map[string]bool)
for _, k := range keys {
series, _ := seriesAndFieldFromCompositeKey(k)
seriesKeys[series] = true
}
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
for _, k := range keys {
delete(l.cache, k)
}
// now remove any of these that are marked for creation
var seriesCreate []*tsdb.SeriesCreate
for _, sc := range l.seriesToCreateCache {
if _, ok := seriesKeys[sc.Series.Key]; !ok {
seriesCreate = append(seriesCreate, sc)
}
}
l.seriesToCreateCache = seriesCreate
}
// Close will finish any flush that is currently in process and close file handles
func (l *Log) Close() error {
l.writeLock.Lock()
l.cacheLock.Lock()
defer l.writeLock.Unlock()
defer l.cacheLock.Unlock()
l.cache = nil
l.measurementFieldsCache = nil
l.seriesToCreateCache = nil
if l.currentSegmentFile == nil {
return nil
}
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
l.currentSegmentFile = nil
return nil
}
// close all the open Log partitions and file handles
func (l *Log) close() error {
l.cache = nil
l.cacheDirtySort = nil
if l.currentSegmentFile == nil {
return nil
}
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
l.currentSegmentFile = nil
return nil
}
// flush writes all wal data in memory to the index
func (l *Log) flush(flush flushType) error {
// only flush if there isn't one already running. Memory flushes are only triggered
// by writes, which will mark the flush as running, so we can ignore it.
l.cacheLock.Lock()
if l.flushRunning && flush != memoryFlush {
l.cacheLock.Unlock()
return nil
}
// mark the flush as running and ensure that it gets marked as not running when we return
l.flushRunning = true
defer func() {
l.cacheLock.Lock()
l.flushRunning = false
l.cacheLock.Unlock()
}()
// only hold the lock while we rotate the segment file
l.writeLock.Lock()
lastFileID := l.currentSegmentID
// if it's an idle flush, don't open a new segment file
if flush == idleFlush {
if l.currentSegmentFile != nil {
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
l.currentSegmentFile = nil
l.currentSegmentSize = 0
}
} else {
if err := l.newSegmentFile(); err != nil {
// there's no recovering from this, fail hard
panic(fmt.Sprintf("error creating new wal file: %s", err.Error()))
}
}
l.writeLock.Unlock()
// copy the cache items to new maps so we can empty them out
l.flushCache = make(map[string]Values)
valueCount := 0
for key, v := range l.cache {
l.flushCache[key] = v
valueCount += len(v)
}
l.cache = make(map[string]Values)
for k, _ := range l.cacheDirtySort {
l.flushCache[k] = l.flushCache[k].Deduplicate()
}
l.cacheDirtySort = make(map[string]bool)
flushSize := l.memorySize
// reset the memory being used by the cache
l.memorySize = 0
// reset the measurements for flushing
mfc := l.measurementFieldsCache
l.measurementFieldsCache = make(map[string]*tsdb.MeasurementFields)
// reset the series for flushing
scc := l.seriesToCreateCache
l.seriesToCreateCache = nil
l.cacheLock.Unlock()
// exit if there's nothing to flush to the index
if len(l.flushCache) == 0 && len(mfc) == 0 && len(scc) == 0 && flush != startupFlush {
return nil
}
if l.LoggingEnabled {
ftype := "idle"
if flush == memoryFlush {
ftype = "memory"
} else if flush == startupFlush {
ftype = "startup"
}
l.logger.Printf("%s flush of %s with %d keys and %d total values of %d bytes\n", ftype, l.path, len(l.flushCache), valueCount, flushSize)
}
startTime := time.Now()
if err := l.Index.Write(l.flushCache, mfc, scc); err != nil {
return err
}
if l.LoggingEnabled {
l.logger.Printf("%s flush to index took %s\n", l.path, time.Since(startTime))
}
l.cacheLock.Lock()
l.flushCache = nil
l.cacheLock.Unlock()
// remove all the old segment files
fileNames, err := l.segmentFileNames()
if err != nil {
return err
}
for _, fn := range fileNames {
id, err := idFromFileName(fn)
if err != nil {
return err
}
if id <= lastFileID {
err := os.Remove(fn)
if err != nil {
return err
}
}
}
return nil
}
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID
func (l *Log) segmentFileNames() ([]string, error) {
names, err := filepath.Glob(filepath.Join(l.path, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
if err != nil {
return nil, err
}
sort.Strings(names)
return names, nil
}
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log
func (l *Log) newSegmentFile() error {
l.currentSegmentID += 1
if l.currentSegmentFile != nil {
if err := l.currentSegmentFile.Close(); err != nil {
return err
}
}
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
l.currentSegmentSize = 0
l.currentSegmentFile = ff
return nil
}
// shouldFlush will return the flushType specifying whether we should flush. memoryFlush
// is never returned from this function since those can only be triggered by writes
func (l *Log) shouldFlush() flushType {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
if l.flushRunning {
return noFlush
}
if len(l.cache) == 0 {
return noFlush
}
if time.Since(l.lastWriteTime) > l.FlushColdInterval {
return idleFlush
}
return noFlush
}
// cursor is a unidirectional iterator for a given entry in the cache
type walCursor struct {
cache Values
position int
ascending bool
}
func newWALCursor(cache Values, ascending bool) *walCursor {
// position is set such that a call to Next will successfully advance
// to the next postion and return the value.
c := &walCursor{cache: cache, ascending: ascending, position: -1}
if !ascending {
c.position = len(c.cache)
}
return c
}
func (c *walCursor) Ascending() bool { return c.ascending }
// Seek will point the cursor to the given time (or key)
func (c *walCursor) SeekTo(seek int64) (int64, interface{}) {
// Seek cache index
c.position = sort.Search(len(c.cache), func(i int) bool {
return c.cache[i].Time().UnixNano() >= seek
})
// If seek is not in the cache, return the last value in the cache
if !c.ascending && c.position >= len(c.cache) {
c.position = len(c.cache) - 1
}
// Make sure our position points to something in the cache
if c.position < 0 || c.position >= len(c.cache) {
return tsdb.EOF, nil
}
v := c.cache[c.position]
return v.Time().UnixNano(), v.Value()
}
// Next moves the cursor to the next key/value. will return nil if at the end
func (c *walCursor) Next() (int64, interface{}) {
var v Value
if c.ascending {
v = c.nextForward()
} else {
v = c.nextReverse()
}
return v.Time().UnixNano(), v.Value()
}
// nextForward advances the cursor forward returning the next value
func (c *walCursor) nextForward() Value {
c.position++
if c.position >= len(c.cache) {
return &EmptyValue{}
}
return c.cache[c.position]
}
// nextReverse advances the cursor backwards returning the next value
func (c *walCursor) nextReverse() Value {
c.position--
if c.position < 0 {
return &EmptyValue{}
}
return c.cache[c.position]
}
// deleteData holds the information for a delete entry
type deleteData struct {
// MeasurementName will be empty for deletes that are only against series
MeasurementName string
Keys []string
}
// idFromFileName parses the segment file ID from its name
func idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")
if len(parts) != 2 {
return 0, fmt.Errorf("file %s has wrong name format to have an id", name)
}
id, err := strconv.ParseUint(parts[0][1:], 10, 32)
return int(id), err
}

View File

@ -0,0 +1,178 @@
package tsm1_test
import (
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func TestWAL_TestWriteQueryOpen(t *testing.T) {
w := NewWAL()
defer w.Cleanup()
var vals map[string]tsm1.Values
var fields map[string]*tsdb.MeasurementFields
var series []*tsdb.SeriesCreate
w.Index = &MockIndexWriter{
fn: func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
vals = valuesByKey
fields = measurementFieldsToSave
series = seriesToCreate
return nil
},
}
if err := w.Open(); err != nil {
t.Fatalf("error opening: %s", err.Error())
}
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=B value=1.2 1000000000")
p3 := parsePoint("cpu,host=A value=2.1 2000000000")
p4 := parsePoint("cpu,host=B value=2.2 2000000000")
fieldsToWrite := map[string]*tsdb.MeasurementFields{"foo": {Fields: map[string]*tsdb.Field{"bar": {Name: "value"}}}}
seriesToWrite := []*tsdb.SeriesCreate{{Measurement: "asdf"}}
if err := w.WritePoints([]models.Point{p1, p2}, fieldsToWrite, seriesToWrite); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
fieldNames := []string{"value"}
var codec *tsdb.FieldCodec
c := w.Cursor("cpu,host=A", fieldNames, codec, true)
k, v := c.Next()
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != v {
t.Fatal("p1 data not equal")
}
c = w.Cursor("cpu,host=B", fieldNames, codec, true)
k, v = c.Next()
if k != p2.UnixNano() {
t.Fatalf("p2 time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
}
if 1.2 != v {
t.Fatal("p2 data not equal")
}
k, v = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF", k, v)
}
// ensure we can do another write to the wal and get stuff
if err := w.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
c = w.Cursor("cpu,host=A", fieldNames, codec, true)
k, v = c.Next()
if k != p1.UnixNano() {
t.Fatalf("p1 time wrong:\n\texp:%d\n\tgot:%d\n", p1.UnixNano(), k)
}
if 1.1 != v {
t.Fatal("p1 data not equal")
}
k, v = c.Next()
if k != p3.UnixNano() {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
}
if 2.1 != v {
t.Fatal("p3 data not equal")
}
// ensure we can seek
k, v = c.SeekTo(2000000000)
if k != p3.UnixNano() {
t.Fatalf("p3 time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
}
if 2.1 != v {
t.Fatal("p3 data not equal")
}
k, v = c.Next()
if k != tsdb.EOF {
t.Fatal("expected EOF")
}
// ensure we close and after open it flushes to the index
if err := w.Close(); err != nil {
t.Fatalf("failed to close: %s", err.Error())
}
if err := w.Open(); err != nil {
t.Fatalf("failed to open: %s", err.Error())
}
if len(vals[tsm1.SeriesFieldKey("cpu,host=A", "value")]) != 2 {
t.Fatal("expected host A values to flush to index on open")
}
if len(vals[tsm1.SeriesFieldKey("cpu,host=B", "value")]) != 1 {
t.Fatal("expected host B values to flush to index on open")
}
if err := w.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
c = w.Cursor("cpu,host=B", fieldNames, codec, true)
k, v = c.Next()
if k != p4.UnixNano() {
t.Fatalf("p4 time wrong:\n\texp:%d\n\tgot:%d\n", p4.UnixNano(), k)
}
if 2.2 != v {
t.Fatal("p4 data not equal")
}
if !reflect.DeepEqual(fields, fieldsToWrite) {
t.Fatal("fields not flushed")
}
if !reflect.DeepEqual(series, seriesToWrite) {
t.Fatal("series not flushed")
}
}
type Log struct {
*tsm1.Log
path string
}
func NewWAL() *Log {
dir, err := ioutil.TempDir("", "tsm1-test")
if err != nil {
panic("couldn't get temp dir")
}
l := &Log{
Log: tsm1.NewLog(dir),
path: dir,
}
l.LoggingEnabled = true
return l
}
func (l *Log) Cleanup() error {
l.Close()
os.RemoveAll(l.path)
return nil
}
type MockIndexWriter struct {
fn func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func (m *MockIndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return m.fn(valuesByKey, measurementFieldsToSave, seriesToCreate)
}
func (m *MockIndexWriter) MarkDeletes(keys []string) {}
func (m *MockIndexWriter) MarkMeasurementDelete(name string) {}

View File

@ -0,0 +1,96 @@
package tsm1
import (
"reflect"
"sync"
)
// writeLock is a lock that enables locking of ranges between a
// min and max value. We use this so that flushes from the WAL
// can occur concurrently along with compactions.
type WriteLock struct {
rangesLock sync.Mutex
ranges []*rangeLock
}
// LockRange will ensure an exclusive lock between the min and
// max values inclusive. Any subsequent calls that have an
// an overlapping range will have to wait until the previous
// lock is released. A corresponding call to UnlockRange should
// be deferred.
func (w *WriteLock) LockRange(min, max int64) {
r := &rangeLock{min: min, max: max}
for {
ranges := w.currentlyLockedRanges()
// ensure there are no currently locked ranges that overlap
for _, rr := range ranges {
if rr.overlaps(r) {
// wait until it gets unlocked
rr.mu.Lock()
// release the lock so the object can get GC'd
rr.mu.Unlock()
}
}
// ensure that no one else got a lock on the range while we
// were waiting
w.rangesLock.Lock()
if len(w.ranges) == 0 || reflect.DeepEqual(ranges, w.ranges) {
// and lock the range
r.mu.Lock()
// now that we know the range is free, add it to the locks
w.ranges = append(w.ranges, r)
w.rangesLock.Unlock()
return
}
// try again
w.rangesLock.Unlock()
}
}
// UnlockRange will release a previously locked range.
func (w *WriteLock) UnlockRange(min, max int64) {
w.rangesLock.Lock()
defer w.rangesLock.Unlock()
// take the range out of the slice and unlock it
a := make([]*rangeLock, 0)
for _, r := range w.ranges {
if r.min == min && r.max == max {
r.mu.Unlock()
continue
}
a = append(a, r)
}
w.ranges = a
}
func (w *WriteLock) currentlyLockedRanges() []*rangeLock {
w.rangesLock.Lock()
defer w.rangesLock.Unlock()
a := make([]*rangeLock, len(w.ranges))
copy(a, w.ranges)
return a
}
type rangeLock struct {
mu sync.Mutex
min int64
max int64
}
func (r *rangeLock) overlaps(l *rangeLock) bool {
if l.min >= r.min && l.min <= r.max {
return true
} else if l.max >= r.min && l.max <= r.max {
return true
} else if l.min <= r.min && l.max >= r.max {
return true
} else if l.min >= r.min && l.max <= r.max {
return true
}
return false
}

View File

@ -0,0 +1,131 @@
package tsm1_test
import (
// "sync"
"testing"
"time"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func TestWriteLock_FullCover(t *testing.T) {
w := &tsm1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_RightIntersect(t *testing.T) {
w := &tsm1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(5, 15)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_LeftIntersect(t *testing.T) {
w := &tsm1.WriteLock{}
w.LockRange(1, 4)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_Inside(t *testing.T) {
w := &tsm1.WriteLock{}
w.LockRange(4, 8)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_Same(t *testing.T) {
w := &tsm1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(2, 10)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
// func TestWriteLock_FreeRangeWithContentionElsewhere(t *testing.T) {
// w := &tsm1.WriteLock{}
// w.LockRange(2, 10)
// lock := make(chan bool)
// freeRange := make(chan bool)
// timeout := time.NewTimer(10 * time.Millisecond)
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// wg.Done()
// w.LockRange(4, 12)
// lock <- true
// }()
// // make sure the other go func has gotten to the point of requesting the lock
// wg.Wait()
// go func() {
// w.LockRange(15, 23)
// freeRange <- true
// }()
// select {
// case <-lock:
// t.Fatal("able to get lock when we shouldn't")
// case <-timeout.C:
// t.Fatal("unable to get lock of free range when contention exists elsewhere")
// case <-freeRange:
// // we're all good
// }
// }

View File

@ -16,7 +16,6 @@ import (
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb/internal"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
)
@ -49,7 +48,6 @@ var (
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
db *bolt.DB // underlying data store
index *DatabaseIndex
path string
walPath string
@ -91,6 +89,12 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.path }
// PerformMaintenance gets called periodically to have the engine perform
// any maintenance tasks like WAL flushing and compaction
func (s *Shard) PerformMaintenance() {
s.engine.PerformMaintenance()
}
// open initializes and opens the shard's store.
func (s *Shard) Open() error {
if err := func() error {
@ -121,7 +125,7 @@ func (s *Shard) Open() error {
}
// Load metadata index.
if err := s.engine.LoadMetadataIndex(s.index, s.measurementFields); err != nil {
if err := s.engine.LoadMetadataIndex(s, s.index, s.measurementFields); err != nil {
return fmt.Errorf("load metadata index: %s", err)
}
@ -229,27 +233,30 @@ func (s *Shard) WritePoints(points []models.Point) error {
}
// make sure all data is encoded before attempting to save to bolt
for _, p := range points {
// Ignore if raw data has already been marshaled.
if p.Data() != nil {
continue
}
// only required for the b1 and bz1 formats
if s.engine.Format() != TSM1Format {
for _, p := range points {
// Ignore if raw data has already been marshaled.
if p.Data() != nil {
continue
}
// This was populated earlier, don't need to validate that it's there.
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()
// This was populated earlier, don't need to validate that it's there.
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
return ErrFieldNotFound
}
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
return ErrFieldNotFound
}
data, err := mf.Codec.EncodeFields(p.Fields())
if err != nil {
return err
data, err := mf.Codec.EncodeFields(p.Fields())
if err != nil {
return err
}
p.SetData(data)
}
p.SetData(data)
}
// Write to the engine.
@ -360,7 +367,9 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[
measurementsToSave[f.Measurement] = m
// add the field to the in memory index
if err := m.CreateFieldIfNotExists(f.Field.Name, f.Field.Type); err != nil {
// only limit the field count for non-tsm eninges
limitFieldCount := s.engine.Format() == B1Format || s.engine.Format() == BZ1Format
if err := m.CreateFieldIfNotExists(f.Field.Name, f.Field.Type, limitFieldCount); err != nil {
return nil, err
}
@ -468,7 +477,7 @@ func (m *MeasurementFields) UnmarshalBinary(buf []byte) error {
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType) error {
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error {
// Ignore if the field already exists.
if f := m.Fields[name]; f != nil {
if f.Type != typ {
@ -477,8 +486,8 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.Dat
return nil
}
// Only 255 fields are allowed. If we go over that then return an error.
if len(m.Fields)+1 > math.MaxUint8 {
// If we're supposed to limit the number of fields, only 255 are allowed. If we go over that then return an error.
if len(m.Fields)+1 > math.MaxUint8 && limitCount {
return ErrFieldOverflow
}
@ -741,15 +750,22 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
// DecodeByName scans a byte slice for a field with the given name, converts it to its
// expected type, and return that value.
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) {
fi := f.fieldByName(name)
fi := f.FieldByName(name)
if fi == nil {
return 0, ErrFieldNotFound
}
return f.DecodeByID(fi.ID, b)
}
func (f *FieldCodec) Fields() (a []*Field) {
for _, f := range f.fieldsByID {
a = append(a, f)
}
return
}
// FieldByName returns the field by its name. It will return a nil if not found
func (f *FieldCodec) fieldByName(name string) *Field {
func (f *FieldCodec) FieldByName(name string) *Field {
return f.fieldsByName[name]
}

View File

@ -8,7 +8,6 @@ import (
"path/filepath"
"time"
"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/snapshot"
)
@ -83,7 +82,7 @@ func appendShardSnapshotFile(sw *snapshot.Writer, sh *Shard, name string) error
}
// Begin transaction.
tx, err := sh.db.Begin(false)
tx, err := sh.engine.Begin(false)
if err != nil {
return fmt.Errorf("begin: %s", err)
}
@ -103,7 +102,7 @@ func appendShardSnapshotFile(sw *snapshot.Writer, sh *Shard, name string) error
// boltTxCloser wraps a Bolt transaction to implement io.Closer.
type boltTxCloser struct {
*bolt.Tx
Tx
}
// Close rolls back the transaction.

View File

@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
@ -27,6 +28,11 @@ func NewStore(path string) *Store {
var (
ErrShardNotFound = fmt.Errorf("shard not found")
ErrStoreClosed = fmt.Errorf("store is closed")
)
const (
MaintenanceCheckInterval = time.Minute
)
type Store struct {
@ -38,7 +44,10 @@ type Store struct {
EngineOptions EngineOptions
Logger *log.Logger
closing chan struct{}
closing chan struct{}
wg sync.WaitGroup
opened bool
}
// Path returns the store's root path.
@ -71,7 +80,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
select {
case <-s.closing:
return fmt.Errorf("closing")
return ErrStoreClosed
default:
}
@ -124,7 +133,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
return err
}
if err := os.Remove(sh.path); err != nil {
if err := os.RemoveAll(sh.path); err != nil {
return err
}
@ -301,6 +310,41 @@ func (s *Store) loadShards() error {
}
// periodicMaintenance is the method called in a goroutine on the opening of the store
// to perform periodic maintenance of the shards.
func (s *Store) periodicMaintenance() {
t := time.NewTicker(MaintenanceCheckInterval)
for {
select {
case <-t.C:
s.performMaintenance()
case <-s.closing:
t.Stop()
return
}
}
}
// performMaintenance will loop through the shars and tell them
// to perform any maintenance tasks. Those tasks should kick off
// their own goroutines if it's anything that could take time.
func (s *Store) performMaintenance() {
s.mu.Lock()
defer s.mu.Unlock()
for _, sh := range s.shards {
s.performMaintenanceOnShard(sh)
}
}
func (s *Store) performMaintenanceOnShard(shard *Shard) {
defer func() {
if r := recover(); r != nil {
s.Logger.Printf("recovered eror in maintenance on shard %d", shard.id)
}
}()
shard.PerformMaintenance()
}
func (s *Store) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
@ -326,12 +370,22 @@ func (s *Store) Open() error {
return err
}
go s.periodicMaintenance()
s.opened = true
return nil
}
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
select {
case <-s.closing:
return ErrStoreClosed
default:
}
sh, ok := s.shards[shardID]
if !ok {
return ErrShardNotFound
@ -367,15 +421,17 @@ func (s *Store) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.opened {
close(s.closing)
}
s.wg.Wait()
for _, sh := range s.shards {
if err := sh.Close(); err != nil {
return err
}
}
if s.closing != nil {
close(s.closing)
}
s.closing = nil
s.opened = false
s.shards = nil
s.databaseIndexes = nil