From 1b462312a9b1d00ab725751b2f821fc64a7fe17c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 30 Sep 2016 11:22:06 -0600 Subject: [PATCH 1/3] Re-use decoder pools The decoders were held onto each iterator to avoid creating them all the time. Some of them have use quite a bit of memory so they can be expensive to create when querying across many series. Intead, more them to a re-usable pool where we create the minimum that could active be in use. This reduces garbage as well as makes the iterators less expensive to create. --- tsdb/engine/tsm1/encoding.go | 163 ++++++++++++++++++------ tsdb/engine/tsm1/encoding_test.go | 8 +- tsdb/engine/tsm1/file_store.gen.go | 32 ++--- tsdb/engine/tsm1/file_store.gen.go.tmpl | 8 +- tsdb/engine/tsm1/file_store.go | 24 ++-- tsdb/engine/tsm1/file_store_test.go | 136 ++++++++++---------- tsdb/engine/tsm1/iterator.gen.go | 48 +++---- tsdb/engine/tsm1/iterator.gen.go.tmpl | 12 +- tsdb/engine/tsm1/reader.go | 40 +++--- 9 files changed, 265 insertions(+), 206 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 80e86c77c8..5c9e04be99 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -3,6 +3,7 @@ package tsm1 import ( "encoding/binary" "fmt" + "runtime" "time" "github.com/influxdata/influxdb/influxql" @@ -28,22 +29,63 @@ const ( encodedBlockHeaderSize = 1 ) +func init() { + // Prime the pools with with at one encoder/decoder for each available CPU + vals := make([]interface{}, 0, runtime.NumCPU()) + for _, p := range []*pool.Generic{ + timeEncoderPool, timeDecoderPool, + integerEncoderPool, integerDecoderPool, + floatDecoderPool, floatDecoderPool, + stringEncoderPool, stringEncoderPool, + booleanEncoderPool, booleanDecoderPool, + } { + vals = vals[:0] + // Check one out to force the allocation now and hold onto it + for i := 0; i < runtime.NumCPU(); i++ { + v := p.Get(tsdb.DefaultMaxPointsPerBlock) + vals = append(vals, v) + } + // Add them all back + for _, v := range vals { + p.Put(v) + } + } +} + var ( - timeEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + // encoder pools + timeEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewTimeEncoder(sz) }) - integerEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + integerEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewIntegerEncoder(sz) }) - floatEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + floatEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewFloatEncoder() }) - stringEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + stringEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewStringEncoder(sz) }) - booleanEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + booleanEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewBooleanEncoder(sz) }) + + // decoder pools + timeDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &TimeDecoder{} + }) + integerDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &IntegerDecoder{} + }) + floatDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &FloatDecoder{} + }) + stringDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &StringDecoder{} + }) + booleanDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &BooleanDecoder{} + }) ) type Value interface { @@ -163,7 +205,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { switch blockType { case BlockFloat64: var buf []FloatValue - decoded, err := DecodeFloatBlock(block, &TimeDecoder{}, &FloatDecoder{}, &buf) + decoded, err := DecodeFloatBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -173,7 +215,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { return vals[:len(decoded)], err case BlockInteger: var buf []IntegerValue - decoded, err := DecodeIntegerBlock(block, &TimeDecoder{}, &IntegerDecoder{}, &buf) + decoded, err := DecodeIntegerBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -184,7 +226,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { case BlockBoolean: var buf []BooleanValue - decoded, err := DecodeBooleanBlock(block, &TimeDecoder{}, &BooleanDecoder{}, &buf) + decoded, err := DecodeBooleanBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -195,7 +237,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { case BlockString: var buf []StringValue - decoded, err := DecodeStringBlock(block, &TimeDecoder{}, &StringDecoder{}, &buf) + decoded, err := DecodeStringBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -276,7 +318,7 @@ cleanup: } -func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]FloatValue) ([]FloatValue, error) { +func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockFloat64 { @@ -289,14 +331,18 @@ func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[] return nil, err } + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := floatDecoderPool.Get(0).(*FloatDecoder) + + var i int // Setup our timestamp and value decoders tdec.Init(tb) - if err := vdec.SetBytes(vb); err != nil { - return nil, err + err = vdec.SetBytes(vb) + if err != nil { + goto cleanup } // Decode both a timestamp and value - i := 0 for tdec.Next() && vdec.Next() { ts := tdec.Read() v := vdec.Values() @@ -311,15 +357,22 @@ func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[] } // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() - } - // Did float decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() + err = tdec.Error() + if err != nil { + goto cleanup } - return (*a)[:i], nil + // Did float decoding have an error? + err = vdec.Error() + if err != nil { + goto cleanup + } + +cleanup: + timeDecoderPool.Put(tdec) + floatDecoderPool.Put(vdec) + + return (*a)[:i], err } type BooleanValue struct { @@ -384,7 +437,7 @@ cleanup: return b, err } -func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a *[]BooleanValue) ([]BooleanValue, error) { +func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) { // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockBoolean { @@ -397,6 +450,9 @@ func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a return nil, err } + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := booleanDecoderPool.Get(0).(*BooleanDecoder) + // Setup our timestamp and value decoders tdec.Init(tb) vdec.SetBytes(vb) @@ -417,15 +473,21 @@ func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a } // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() + err = tdec.Error() + if err != nil { + goto cleanup } // Did boolean decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() + err = vdec.Error() + if err != nil { + goto cleanup } - return (*a)[:i], nil +cleanup: + timeDecoderPool.Put(tdec) + booleanDecoderPool.Put(vdec) + + return (*a)[:i], err } type IntegerValue struct { @@ -481,7 +543,7 @@ cleanup: return b, err } -func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a *[]IntegerValue) ([]IntegerValue, error) { +func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) { blockType := block[0] if blockType != BlockInteger { return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInteger, blockType) @@ -495,6 +557,9 @@ func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a return nil, err } + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := integerDecoderPool.Get(0).(*IntegerDecoder) + // Setup our timestamp and value decoders tdec.Init(tb) vdec.SetBytes(vb) @@ -515,15 +580,21 @@ func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a } // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() + err = tdec.Error() + if err != nil { + goto cleanup } // Did int64 decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() + err = vdec.Error() + if err != nil { + goto cleanup } - return (*a)[:i], nil +cleanup: + timeDecoderPool.Put(tdec) + integerDecoderPool.Put(vdec) + + return (*a)[:i], err } type StringValue struct { @@ -579,7 +650,7 @@ cleanup: return b, err } -func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *[]StringValue) ([]StringValue, error) { +func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) { blockType := block[0] if blockType != BlockString { return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType) @@ -593,14 +664,18 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a * return nil, err } + var i int + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := stringDecoderPool.Get(0).(*StringDecoder) + // Setup our timestamp and value decoders tdec.Init(tb) - if err := vdec.SetBytes(vb); err != nil { - return nil, err + err = vdec.SetBytes(vb) + if err != nil { + goto cleanup } // Decode both a timestamp and value - i := 0 for tdec.Next() && vdec.Next() { ts := tdec.Read() v := vdec.Read() @@ -615,15 +690,21 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a * } // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() + err = tdec.Error() + if err != nil { + goto cleanup } // Did string decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() + err = vdec.Error() + if err != nil { + goto cleanup } - return (*a)[:i], nil +cleanup: + timeDecoderPool.Put(tdec) + stringDecoderPool.Put(vdec) + + return (*a)[:i], err } func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte { diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index 7d6ea7170f..b4b384da1c 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -1095,7 +1095,7 @@ func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.FloatValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeFloatBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &decodedValues) + _, err = tsm1.DecodeFloatBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1164,7 +1164,7 @@ func BenchmarkDecodeBlock_Integer_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.IntegerValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeIntegerBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &decodedValues) + _, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1233,7 +1233,7 @@ func BenchmarkDecodeBlock_Boolean_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.BooleanValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeBooleanBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &decodedValues) + _, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1302,7 +1302,7 @@ func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.StringValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeStringBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &decodedValues) + _, err = tsm1.DecodeStringBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } diff --git a/tsdb/engine/tsm1/file_store.gen.go b/tsdb/engine/tsm1/file_store.gen.go index b30782e012..bf2ae09bd0 100644 --- a/tsdb/engine/tsm1/file_store.gen.go +++ b/tsdb/engine/tsm1/file_store.gen.go @@ -7,7 +7,7 @@ package tsm1 // ReadFloatBlock reads the next block as a set of float values. -func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[]FloatValue) ([]FloatValue, error) { +func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -16,7 +16,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadFloatBlockAt(&first.entry, buf) // Remove values we already read values = FloatValues(values).Exclude(first.readMin, first.readMax) @@ -81,7 +81,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ tombstones := cur.r.TombstoneRange(c.key) var a []FloatValue - v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -140,7 +140,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ tombstones := cur.r.TombstoneRange(c.key) var a []FloatValue - v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -167,7 +167,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ } // ReadIntegerBlock reads the next block as a set of integer values. -func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, buf *[]IntegerValue) ([]IntegerValue, error) { +func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -176,7 +176,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadIntegerBlockAt(&first.entry, buf) // Remove values we already read values = IntegerValues(values).Exclude(first.readMin, first.readMax) @@ -241,7 +241,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []IntegerValue - v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -300,7 +300,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []IntegerValue - v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -327,7 +327,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu } // ReadStringBlock reads the next block as a set of string values. -func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf *[]StringValue) ([]StringValue, error) { +func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -336,7 +336,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadStringBlockAt(&first.entry, buf) // Remove values we already read values = StringValues(values).Exclude(first.readMin, first.readMax) @@ -401,7 +401,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf tombstones := cur.r.TombstoneRange(c.key) var a []StringValue - v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadStringBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -460,7 +460,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf tombstones := cur.r.TombstoneRange(c.key) var a []StringValue - v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadStringBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -487,7 +487,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf } // ReadBooleanBlock reads the next block as a set of boolean values. -func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, buf *[]BooleanValue) ([]BooleanValue, error) { +func (c *KeyCursor) ReadBooleanBlock(buf *[]BooleanValue) ([]BooleanValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -496,7 +496,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadBooleanBlockAt(&first.entry, buf) // Remove values we already read values = BooleanValues(values).Exclude(first.readMin, first.readMax) @@ -561,7 +561,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []BooleanValue - v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -620,7 +620,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []BooleanValue - v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/file_store.gen.go.tmpl b/tsdb/engine/tsm1/file_store.gen.go.tmpl index 799d05792a..164b6baac3 100644 --- a/tsdb/engine/tsm1/file_store.gen.go.tmpl +++ b/tsdb/engine/tsm1/file_store.gen.go.tmpl @@ -3,7 +3,7 @@ package tsm1 {{range .}} // Read{{.Name}}Block reads the next block as a set of {{.name}} values. -func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder, buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) { +func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -12,7 +12,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.Read{{.Name}}BlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.Read{{.Name}}BlockAt(&first.entry, buf) // Remove values we already read values = {{.Name}}Values(values).Exclude(first.readMin, first.readMax) @@ -77,7 +77,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder tombstones := cur.r.TombstoneRange(c.key) var a []{{.Name}}Value - v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -136,7 +136,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder tombstones := cur.r.TombstoneRange(c.key) var a []{{.Name}}Value - v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index d3e5a10731..99a0c22d5a 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -28,10 +28,10 @@ type TSMFile interface { // ReadAt returns all the values in the block identified by entry. ReadAt(entry *IndexEntry, values []Value) ([]Value, error) - ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) - ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) - ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) - ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) + ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) + ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) + ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) + ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) // Entries returns the index entries for all blocks for the given key. Entries(key string) []IndexEntry @@ -680,14 +680,13 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error { // whether the key will be scan in ascending time order or descenging time order. // This function assumes the read-lock has been taken. func (f *FileStore) locations(key string, t int64, ascending bool) []*location { - var locations []*location - filesSnapshot := make([]TSMFile, len(f.files)) for i := range f.files { filesSnapshot[i] = f.files[i] } var entries []IndexEntry + locations := make([]*location, 0, len(filesSnapshot)) for _, fd := range filesSnapshot { minTime, maxTime := fd.TimeRange() @@ -799,13 +798,13 @@ func ParseTSMFileName(name string) (int, int, error) { id := base[:idx] - parts := strings.Split(id, "-") - if len(parts) != 2 { + idx = strings.Index(id, "-") + if idx == -1 { return 0, 0, fmt.Errorf("file %s is named incorrectly", name) } - generation, err := strconv.ParseUint(parts[0], 10, 32) - sequence, err := strconv.ParseUint(parts[1], 10, 32) + generation, err := strconv.ParseUint(id[:idx], 10, 32) + sequence, err := strconv.ParseUint(id[idx+1:], 10, 32) return int(generation), int(sequence), err } @@ -891,8 +890,9 @@ func newKeyCursor(fs *FileStore, key string, t int64, ascending bool) *KeyCursor fs: fs, seeks: fs.locations(key, t, ascending), ascending: ascending, - refs: map[string]TSMFile{}, } + c.refs = make(map[string]TSMFile, len(c.seeks)) + c.duplicates = c.hasOverlappingBlocks() if ascending { @@ -1025,7 +1025,7 @@ func (c *KeyCursor) nextAscending() { // Append the first matching block if len(c.current) == 0 { - c.current = append(c.current, &location{}) + c.current = append(c.current, nil) } else { c.current = c.current[:1] } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index b96c094852..95dc3d7587 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -70,7 +70,7 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -110,7 +110,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -130,7 +130,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -149,7 +149,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -178,7 +178,7 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -220,7 +220,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -242,7 +242,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -287,7 +287,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -308,7 +308,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -353,7 +353,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -374,7 +374,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -419,7 +419,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -440,7 +440,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -485,7 +485,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -509,7 +509,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -529,7 +529,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -561,7 +561,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -585,7 +585,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -604,7 +604,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -636,7 +636,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -660,7 +660,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -679,7 +679,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -711,7 +711,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -735,7 +735,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -754,7 +754,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -784,7 +784,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 3, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -801,7 +801,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -840,7 +840,7 @@ func TestFileStore_SeekToAsc_End(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -879,7 +879,7 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -918,7 +918,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -936,7 +936,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -977,7 +977,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -998,7 +998,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1042,7 +1042,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1063,7 +1063,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1105,7 +1105,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1126,7 +1126,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1168,7 +1168,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1189,7 +1189,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1229,7 +1229,7 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 4, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1268,7 +1268,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 10, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1289,7 +1289,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1311,7 +1311,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1332,7 +1332,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1365,7 +1365,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1388,7 +1388,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1409,7 +1409,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1442,7 +1442,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1463,7 +1463,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1485,7 +1485,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1506,7 +1506,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1539,7 +1539,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1560,7 +1560,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1582,7 +1582,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1603,7 +1603,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1639,7 +1639,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 3, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1659,7 +1659,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { } } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1680,7 +1680,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1712,7 +1712,7 @@ func TestFileStore_SeekToDesc_End(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1756,7 +1756,7 @@ func TestKeyCursor_TombstoneRange(t *testing.T) { c := fs.KeyCursor("cpu", 0, true) expValues := []int{0, 2} for _, v := range expValues { - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1799,7 +1799,7 @@ func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1843,7 +1843,7 @@ func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1887,7 +1887,7 @@ func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1931,7 +1931,7 @@ func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -2064,23 +2064,21 @@ func TestFileStore_Replace(t *testing.T) { } // There should be two blocks (1 in each file) - tdec := &tsm1.TimeDecoder{} - vdec := &tsm1.FloatDecoder{} cur.Next() buf := make([]tsm1.FloatValue, 10) - values, err := cur.ReadFloatBlock(tdec, vdec, &buf) + values, err := cur.ReadFloatBlock(&buf) if got, exp := len(values), 1; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } cur.Next() - values, err = cur.ReadFloatBlock(tdec, vdec, &buf) + values, err = cur.ReadFloatBlock(&buf) if got, exp := len(values), 1; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } // No more blocks for this cursor cur.Next() - values, err = cur.ReadFloatBlock(tdec, vdec, &buf) + values, err = cur.ReadFloatBlock(&buf) if got, exp := len(values), 0; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index 4a19c3e916..762a046799 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -311,8 +311,6 @@ type floatAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec FloatDecoder buf []FloatValue values []FloatValue pos int @@ -330,7 +328,7 @@ func newFloatAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCu c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]FloatValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -412,7 +410,7 @@ func (c *floatAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -427,8 +425,6 @@ type floatDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec FloatDecoder buf []FloatValue values []FloatValue pos int @@ -449,7 +445,7 @@ func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]FloatValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -534,7 +530,7 @@ func (c *floatDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -758,8 +754,6 @@ type integerAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec IntegerDecoder buf []IntegerValue values []IntegerValue pos int @@ -777,7 +771,7 @@ func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]IntegerValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -859,7 +853,7 @@ func (c *integerAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -874,8 +868,6 @@ type integerDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec IntegerDecoder buf []IntegerValue values []IntegerValue pos int @@ -896,7 +888,7 @@ func newIntegerDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]IntegerValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -981,7 +973,7 @@ func (c *integerDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1205,8 +1197,6 @@ type stringAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec StringDecoder buf []StringValue values []StringValue pos int @@ -1224,7 +1214,7 @@ func newStringAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]StringValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1306,7 +1296,7 @@ func (c *stringAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1321,8 +1311,6 @@ type stringDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec StringDecoder buf []StringValue values []StringValue pos int @@ -1343,7 +1331,7 @@ func newStringDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]StringValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1428,7 +1416,7 @@ func (c *stringDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1652,8 +1640,6 @@ type booleanAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec BooleanDecoder buf []BooleanValue values []BooleanValue pos int @@ -1671,7 +1657,7 @@ func newBooleanAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]BooleanValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1753,7 +1739,7 @@ func (c *booleanAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1768,8 +1754,6 @@ type booleanDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec BooleanDecoder buf []BooleanValue values []BooleanValue pos int @@ -1790,7 +1774,7 @@ func newBooleanDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]BooleanValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1875,7 +1859,7 @@ func (c *booleanDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpl b/tsdb/engine/tsm1/iterator.gen.go.tmpl index 92b22a405b..2e1f5c0c20 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpl @@ -307,8 +307,6 @@ type {{.name}}AscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec {{.Name}}Decoder buf []{{.Name}}Value values []{{.Name}}Value pos int @@ -326,7 +324,7 @@ func new{{.Name}}AscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *K c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]{{.Name}}Value, 10) - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -408,7 +406,7 @@ func (c *{{.name}}AscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -423,8 +421,6 @@ type {{.name}}DescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec {{.Name}}Decoder buf []{{.Name}}Value values []{{.Name}}Value pos int @@ -445,7 +441,7 @@ func new{{.Name}}DescendingCursor(seek int64, cacheValues Values, tsmKeyCursor * c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]{{.Name}}Value, 10) - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -530,7 +526,7 @@ func (c *{{.name}}DescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) if len(c.tsm.values) == 0 { return } diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 9186c20024..daaa516b42 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -163,10 +163,10 @@ type blockAccessor interface { read(key string, timestamp int64) ([]Value, error) readAll(key string) ([]Value, error) readBlock(entry *IndexEntry, values []Value) ([]Value, error) - readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, fdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) - readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) - readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) - readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) + readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) + readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) + readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) + readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) rename(path string) error path() string @@ -254,30 +254,30 @@ func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { return v, err } -func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, vals *[]FloatValue) ([]FloatValue, error) { +func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) { t.mu.RLock() - v, err := t.accessor.readFloatBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readFloatBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, vals *[]IntegerValue) ([]IntegerValue, error) { +func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) { t.mu.RLock() - v, err := t.accessor.readIntegerBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readIntegerBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, vals *[]StringValue) ([]StringValue, error) { +func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) { t.mu.RLock() - v, err := t.accessor.readStringBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readStringBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, vals *[]BooleanValue) ([]BooleanValue, error) { +func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) { t.mu.RLock() - v, err := t.accessor.readBooleanBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readBooleanBlock(entry, vals) t.mu.RUnlock() return v, err } @@ -1041,7 +1041,7 @@ func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, er return values, nil } -func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) { +func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1049,7 +1049,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec return nil, ErrTSMClosed } - a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1059,7 +1059,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec return a, nil } -func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) { +func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1067,7 +1067,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd return nil, ErrTSMClosed } - a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1077,7 +1077,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd return a, nil } -func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) { +func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1085,7 +1085,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde return nil, ErrTSMClosed } - a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1095,7 +1095,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde return a, nil } -func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) { +func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1103,7 +1103,7 @@ func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vd return nil, ErrTSMClosed } - a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { From 750c8b3932b3d6e25d05c8a2f59486ac1f838a9c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 30 Sep 2016 14:15:06 -0600 Subject: [PATCH 2/3] Reduce lock contention in cache.Values The cache read lock was held for the whole duration of the call when it only needs to be held at the beginning since entries have their own locks. --- tsdb/engine/tsm1/cache.go | 122 ++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 64 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 42905e6c7f..ee75c267df 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -349,9 +349,65 @@ func (c *Cache) Keys() []string { // Values returns a copy of all values, deduped and sorted, for the given key. func (c *Cache) Values(key string) Values { + var snapshotEntries *entry c.mu.RLock() - defer c.mu.RUnlock() - return c.merged(key) + e := c.store[key] + if c.snapshot != nil { + snapshotEntries = c.snapshot.store[key] + } + c.mu.RUnlock() + + if e == nil { + if snapshotEntries == nil { + // No values in hot cache or snapshots. + return nil + } + } else { + e.deduplicate() + } + + // Build the sequence of entries that will be returned, in the correct order. + // Calculate the required size of the destination buffer. + var entries []*entry + sz := 0 + + if snapshotEntries != nil { + snapshotEntries.deduplicate() // guarantee we are deduplicated + entries = append(entries, snapshotEntries) + sz += snapshotEntries.count() + } + + if e != nil { + entries = append(entries, e) + sz += e.count() + } + + // Any entries? If not, return. + if sz == 0 { + return nil + } + + // Create the buffer, and copy all hot values and snapshots. Individual + // entries are sorted at this point, so now the code has to check if the + // resultant buffer will be sorted from start to finish. + var needSort bool + values := make(Values, sz) + n := 0 + for _, e := range entries { + e.mu.RLock() + if !needSort && n > 0 && len(e.values) > 0 { + needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() + } + n += copy(values[n:], e.values) + e.mu.RUnlock() + } + values = values[:n] + + if needSort { + values = values.Deduplicate() + } + + return values } // Delete will remove the keys from the cache @@ -397,68 +453,6 @@ func (c *Cache) SetMaxSize(size uint64) { c.mu.Unlock() } -// merged returns a copy of hot and snapshot values. The copy will be merged, deduped, and -// sorted. It assumes all necessary locks have been taken. If the caller knows that the -// the hot source data for the key will not be changed, it is safe to call this function -// with a read-lock taken. Otherwise it must be called with a write-lock taken. -func (c *Cache) merged(key string) Values { - e := c.store[key] - if e == nil { - if c.snapshot == nil { - // No values in hot cache or snapshots. - return nil - } - } else { - e.deduplicate() - } - - // Build the sequence of entries that will be returned, in the correct order. - // Calculate the required size of the destination buffer. - var entries []*entry - sz := 0 - - if c.snapshot != nil { - snapshotEntries := c.snapshot.store[key] - if snapshotEntries != nil { - snapshotEntries.deduplicate() // guarantee we are deduplicated - entries = append(entries, snapshotEntries) - sz += snapshotEntries.count() - } - } - - if e != nil { - entries = append(entries, e) - sz += e.count() - } - - // Any entries? If not, return. - if sz == 0 { - return nil - } - - // Create the buffer, and copy all hot values and snapshots. Individual - // entries are sorted at this point, so now the code has to check if the - // resultant buffer will be sorted from start to finish. - var needSort bool - values := make(Values, sz) - n := 0 - for _, e := range entries { - e.mu.RLock() - if !needSort && n > 0 && len(e.values) > 0 { - needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() - } - n += copy(values[n:], e.values) - e.mu.RUnlock() - } - values = values[:n] - - if needSort { - values = values.Deduplicate() - } - - return values -} - // Store returns the underlying cache store. This is not goroutine safe! // Protect access by using the Lock and Unlock functions on Cache. func (c *Cache) Store() map[string]*entry { From 20f1fb3f7f388eddd4f6f2459979c43335d5ea1e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 3 Oct 2016 12:08:53 -0600 Subject: [PATCH 3/3] Replace gotos with anonymous functions --- tsdb/engine/tsm1/encoding.go | 407 ++++++++++++++++++----------------- 1 file changed, 210 insertions(+), 197 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 5c9e04be99..1ade9c737e 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -287,35 +287,36 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { // frame-or-reference and run length encoding. tsenc := getTimeEncoder(len(values)) - for _, v := range values { - tsenc.Write(v.UnixNano()) - venc.Push(v.(*FloatValue).value) - } - venc.Finish() + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.UnixNano()) + venc.Push(v.(*FloatValue).value) + } + venc.Finish() - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsenc.Bytes() - if err != nil { - goto cleanup - } - // Encoded float values - vb, err = venc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockFloat64, tb, vb) - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockFloat64, tb, vb) + return nil + }() -cleanup: putTimeEncoder(tsenc) putFloatEncoder(venc) - return b, err + return b, err } func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { @@ -335,40 +336,42 @@ func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { vdec := floatDecoderPool.Get(0).(*FloatDecoder) var i int - // Setup our timestamp and value decoders - tdec.Init(tb) - err = vdec.SetBytes(vb) - if err != nil { - goto cleanup - } - - // Decode both a timestamp and value - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Values() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, FloatValue{ts, v}) + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + err = vdec.SetBytes(vb) + if err != nil { + return err } - i++ - } - // Did timestamp decoding have an error? - err = tdec.Error() - if err != nil { - goto cleanup - } + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Values() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, FloatValue{ts, v}) + } + i++ + } - // Did float decoding have an error? - err = vdec.Error() - if err != nil { - goto cleanup - } + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + + // Did float decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() -cleanup: timeDecoderPool.Put(tdec) floatDecoderPool.Put(vdec) @@ -408,32 +411,33 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { // Encode timestamps using an adaptive encoder tsenc := getTimeEncoder(len(values)) - for _, v := range values { - tsenc.Write(v.UnixNano()) - venc.Write(v.(*BooleanValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.UnixNano()) + venc.Write(v.(*BooleanValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsenc.Bytes() - if err != nil { - goto cleanup - } - // Encoded float values - vb, err = venc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockBoolean, tb, vb) + return nil + }() - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockBoolean, tb, vb) - -cleanup: putTimeEncoder(tsenc) putBooleanEncoder(venc) + return b, err } @@ -453,37 +457,39 @@ func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) tdec := timeDecoderPool.Get(0).(*TimeDecoder) vdec := booleanDecoderPool.Get(0).(*BooleanDecoder) - // Setup our timestamp and value decoders - tdec.Init(tb) - vdec.SetBytes(vb) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + vdec.SetBytes(vb) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, BooleanValue{ts, v}) + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, BooleanValue{ts, v}) + } + i++ } - i++ - } - // Did timestamp decoding have an error? - err = tdec.Error() - if err != nil { - goto cleanup - } - // Did boolean decoding have an error? - err = vdec.Error() - if err != nil { - goto cleanup - } + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did boolean decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() -cleanup: timeDecoderPool.Put(tdec) booleanDecoderPool.Put(vdec) @@ -515,31 +521,32 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) vEnc := getIntegerEncoder(len(values)) - for _, v := range values { - tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*IntegerValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.UnixNano()) + vEnc.Write(v.(*IntegerValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded int64 values + vb, err := vEnc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsEnc.Bytes() - if err != nil { - goto cleanup - } - // Encoded int64 values - vb, err = vEnc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockInteger, tb, vb) + return nil + }() - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockInteger, tb, vb) - -cleanup: putTimeEncoder(tsEnc) putIntegerEncoder(vEnc) + return b, err } @@ -560,37 +567,39 @@ func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) tdec := timeDecoderPool.Get(0).(*TimeDecoder) vdec := integerDecoderPool.Get(0).(*IntegerDecoder) - // Setup our timestamp and value decoders - tdec.Init(tb) - vdec.SetBytes(vb) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + vdec.SetBytes(vb) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, IntegerValue{ts, v}) + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, IntegerValue{ts, v}) + } + i++ } - i++ - } - // Did timestamp decoding have an error? - err = tdec.Error() - if err != nil { - goto cleanup - } - // Did int64 decoding have an error? - err = vdec.Error() - if err != nil { - goto cleanup - } + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did int64 decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() -cleanup: timeDecoderPool.Put(tdec) integerDecoderPool.Put(vdec) @@ -622,31 +631,33 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value)) - for _, v := range values { - tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*StringValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.UnixNano()) + vEnc.Write(v.(*StringValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded string values + vb, err := vEnc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsEnc.Bytes() - if err != nil { - goto cleanup - } - // Encoded string values - vb, err = vEnc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockString, tb, vb) - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockString, tb, vb) + return nil + }() -cleanup: putTimeEncoder(tsEnc) putStringEncoder(vEnc) + return b, err } @@ -664,43 +675,45 @@ func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) { return nil, err } - var i int tdec := timeDecoderPool.Get(0).(*TimeDecoder) vdec := stringDecoderPool.Get(0).(*StringDecoder) - // Setup our timestamp and value decoders - tdec.Init(tb) - err = vdec.SetBytes(vb) - if err != nil { - goto cleanup - } - - // Decode both a timestamp and value - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, StringValue{ts, v}) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + err = vdec.SetBytes(vb) + if err != nil { + return err } - i++ - } - // Did timestamp decoding have an error? - err = tdec.Error() - if err != nil { - goto cleanup - } - // Did string decoding have an error? - err = vdec.Error() - if err != nil { - goto cleanup - } + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, StringValue{ts, v}) + } + i++ + } + + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did string decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() -cleanup: timeDecoderPool.Put(tdec) stringDecoderPool.Put(vdec)