diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 42905e6c7f..ee75c267df 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -349,9 +349,65 @@ func (c *Cache) Keys() []string { // Values returns a copy of all values, deduped and sorted, for the given key. func (c *Cache) Values(key string) Values { + var snapshotEntries *entry c.mu.RLock() - defer c.mu.RUnlock() - return c.merged(key) + e := c.store[key] + if c.snapshot != nil { + snapshotEntries = c.snapshot.store[key] + } + c.mu.RUnlock() + + if e == nil { + if snapshotEntries == nil { + // No values in hot cache or snapshots. + return nil + } + } else { + e.deduplicate() + } + + // Build the sequence of entries that will be returned, in the correct order. + // Calculate the required size of the destination buffer. + var entries []*entry + sz := 0 + + if snapshotEntries != nil { + snapshotEntries.deduplicate() // guarantee we are deduplicated + entries = append(entries, snapshotEntries) + sz += snapshotEntries.count() + } + + if e != nil { + entries = append(entries, e) + sz += e.count() + } + + // Any entries? If not, return. + if sz == 0 { + return nil + } + + // Create the buffer, and copy all hot values and snapshots. Individual + // entries are sorted at this point, so now the code has to check if the + // resultant buffer will be sorted from start to finish. + var needSort bool + values := make(Values, sz) + n := 0 + for _, e := range entries { + e.mu.RLock() + if !needSort && n > 0 && len(e.values) > 0 { + needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() + } + n += copy(values[n:], e.values) + e.mu.RUnlock() + } + values = values[:n] + + if needSort { + values = values.Deduplicate() + } + + return values } // Delete will remove the keys from the cache @@ -397,68 +453,6 @@ func (c *Cache) SetMaxSize(size uint64) { c.mu.Unlock() } -// merged returns a copy of hot and snapshot values. The copy will be merged, deduped, and -// sorted. It assumes all necessary locks have been taken. If the caller knows that the -// the hot source data for the key will not be changed, it is safe to call this function -// with a read-lock taken. Otherwise it must be called with a write-lock taken. -func (c *Cache) merged(key string) Values { - e := c.store[key] - if e == nil { - if c.snapshot == nil { - // No values in hot cache or snapshots. - return nil - } - } else { - e.deduplicate() - } - - // Build the sequence of entries that will be returned, in the correct order. - // Calculate the required size of the destination buffer. - var entries []*entry - sz := 0 - - if c.snapshot != nil { - snapshotEntries := c.snapshot.store[key] - if snapshotEntries != nil { - snapshotEntries.deduplicate() // guarantee we are deduplicated - entries = append(entries, snapshotEntries) - sz += snapshotEntries.count() - } - } - - if e != nil { - entries = append(entries, e) - sz += e.count() - } - - // Any entries? If not, return. - if sz == 0 { - return nil - } - - // Create the buffer, and copy all hot values and snapshots. Individual - // entries are sorted at this point, so now the code has to check if the - // resultant buffer will be sorted from start to finish. - var needSort bool - values := make(Values, sz) - n := 0 - for _, e := range entries { - e.mu.RLock() - if !needSort && n > 0 && len(e.values) > 0 { - needSort = values[n-1].UnixNano() >= e.values[0].UnixNano() - } - n += copy(values[n:], e.values) - e.mu.RUnlock() - } - values = values[:n] - - if needSort { - values = values.Deduplicate() - } - - return values -} - // Store returns the underlying cache store. This is not goroutine safe! // Protect access by using the Lock and Unlock functions on Cache. func (c *Cache) Store() map[string]*entry { diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 80e86c77c8..1ade9c737e 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -3,6 +3,7 @@ package tsm1 import ( "encoding/binary" "fmt" + "runtime" "time" "github.com/influxdata/influxdb/influxql" @@ -28,22 +29,63 @@ const ( encodedBlockHeaderSize = 1 ) +func init() { + // Prime the pools with with at one encoder/decoder for each available CPU + vals := make([]interface{}, 0, runtime.NumCPU()) + for _, p := range []*pool.Generic{ + timeEncoderPool, timeDecoderPool, + integerEncoderPool, integerDecoderPool, + floatDecoderPool, floatDecoderPool, + stringEncoderPool, stringEncoderPool, + booleanEncoderPool, booleanDecoderPool, + } { + vals = vals[:0] + // Check one out to force the allocation now and hold onto it + for i := 0; i < runtime.NumCPU(); i++ { + v := p.Get(tsdb.DefaultMaxPointsPerBlock) + vals = append(vals, v) + } + // Add them all back + for _, v := range vals { + p.Put(v) + } + } +} + var ( - timeEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + // encoder pools + timeEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewTimeEncoder(sz) }) - integerEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + integerEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewIntegerEncoder(sz) }) - floatEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + floatEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewFloatEncoder() }) - stringEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + stringEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewStringEncoder(sz) }) - booleanEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} { + booleanEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { return NewBooleanEncoder(sz) }) + + // decoder pools + timeDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &TimeDecoder{} + }) + integerDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &IntegerDecoder{} + }) + floatDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &FloatDecoder{} + }) + stringDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &StringDecoder{} + }) + booleanDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} { + return &BooleanDecoder{} + }) ) type Value interface { @@ -163,7 +205,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { switch blockType { case BlockFloat64: var buf []FloatValue - decoded, err := DecodeFloatBlock(block, &TimeDecoder{}, &FloatDecoder{}, &buf) + decoded, err := DecodeFloatBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -173,7 +215,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { return vals[:len(decoded)], err case BlockInteger: var buf []IntegerValue - decoded, err := DecodeIntegerBlock(block, &TimeDecoder{}, &IntegerDecoder{}, &buf) + decoded, err := DecodeIntegerBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -184,7 +226,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { case BlockBoolean: var buf []BooleanValue - decoded, err := DecodeBooleanBlock(block, &TimeDecoder{}, &BooleanDecoder{}, &buf) + decoded, err := DecodeBooleanBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -195,7 +237,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { case BlockString: var buf []StringValue - decoded, err := DecodeStringBlock(block, &TimeDecoder{}, &StringDecoder{}, &buf) + decoded, err := DecodeStringBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } @@ -245,38 +287,39 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { // frame-or-reference and run length encoding. tsenc := getTimeEncoder(len(values)) - for _, v := range values { - tsenc.Write(v.UnixNano()) - venc.Push(v.(*FloatValue).value) - } - venc.Finish() + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.UnixNano()) + venc.Push(v.(*FloatValue).value) + } + venc.Finish() - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsenc.Bytes() - if err != nil { - goto cleanup - } - // Encoded float values - vb, err = venc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockFloat64, tb, vb) - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockFloat64, tb, vb) + return nil + }() -cleanup: putTimeEncoder(tsenc) putFloatEncoder(venc) - return b, err + return b, err } -func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]FloatValue) ([]FloatValue, error) { +func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockFloat64 { @@ -289,37 +332,50 @@ func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[] return nil, err } - // Setup our timestamp and value decoders - tdec.Init(tb) - if err := vdec.SetBytes(vb); err != nil { - return nil, err - } + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := floatDecoderPool.Get(0).(*FloatDecoder) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Values() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, FloatValue{ts, v}) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + err = vdec.SetBytes(vb) + if err != nil { + return err } - i++ - } - // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() - } - // Did float decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() - } + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Values() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, FloatValue{ts, v}) + } + i++ + } - return (*a)[:i], nil + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + + // Did float decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() + + timeDecoderPool.Put(tdec) + floatDecoderPool.Put(vdec) + + return (*a)[:i], err } type BooleanValue struct { @@ -355,36 +411,37 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { // Encode timestamps using an adaptive encoder tsenc := getTimeEncoder(len(values)) - for _, v := range values { - tsenc.Write(v.UnixNano()) - venc.Write(v.(*BooleanValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.UnixNano()) + venc.Write(v.(*BooleanValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsenc.Bytes() - if err != nil { - goto cleanup - } - // Encoded float values - vb, err = venc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockBoolean, tb, vb) + return nil + }() - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockBoolean, tb, vb) - -cleanup: putTimeEncoder(tsenc) putBooleanEncoder(venc) + return b, err } -func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a *[]BooleanValue) ([]BooleanValue, error) { +func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) { // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockBoolean { @@ -397,35 +454,46 @@ func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a return nil, err } - // Setup our timestamp and value decoders - tdec.Init(tb) - vdec.SetBytes(vb) + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := booleanDecoderPool.Get(0).(*BooleanDecoder) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, BooleanValue{ts, v}) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + vdec.SetBytes(vb) + + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, BooleanValue{ts, v}) + } + i++ } - i++ - } - // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() - } - // Did boolean decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() - } + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did boolean decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() - return (*a)[:i], nil + timeDecoderPool.Put(tdec) + booleanDecoderPool.Put(vdec) + + return (*a)[:i], err } type IntegerValue struct { @@ -453,35 +521,36 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) vEnc := getIntegerEncoder(len(values)) - for _, v := range values { - tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*IntegerValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.UnixNano()) + vEnc.Write(v.(*IntegerValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded int64 values + vb, err := vEnc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsEnc.Bytes() - if err != nil { - goto cleanup - } - // Encoded int64 values - vb, err = vEnc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockInteger, tb, vb) + return nil + }() - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockInteger, tb, vb) - -cleanup: putTimeEncoder(tsEnc) putIntegerEncoder(vEnc) + return b, err } -func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a *[]IntegerValue) ([]IntegerValue, error) { +func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) { blockType := block[0] if blockType != BlockInteger { return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInteger, blockType) @@ -495,35 +564,46 @@ func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a return nil, err } - // Setup our timestamp and value decoders - tdec.Init(tb) - vdec.SetBytes(vb) + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := integerDecoderPool.Get(0).(*IntegerDecoder) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, IntegerValue{ts, v}) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + vdec.SetBytes(vb) + + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, IntegerValue{ts, v}) + } + i++ } - i++ - } - // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() - } - // Did int64 decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() - } + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did int64 decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() - return (*a)[:i], nil + timeDecoderPool.Put(tdec) + integerDecoderPool.Put(vdec) + + return (*a)[:i], err } type StringValue struct { @@ -551,35 +631,37 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value)) - for _, v := range values { - tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*StringValue).value) - } + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.UnixNano()) + vEnc.Write(v.(*StringValue).value) + } - var err error - var b, tb, vb []byte + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded string values + vb, err := vEnc.Bytes() + if err != nil { + return err + } - // Encoded timestamp values - tb, err = tsEnc.Bytes() - if err != nil { - goto cleanup - } - // Encoded string values - vb, err = vEnc.Bytes() - if err != nil { - goto cleanup - } + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockString, tb, vb) - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockString, tb, vb) + return nil + }() -cleanup: putTimeEncoder(tsEnc) putStringEncoder(vEnc) + return b, err } -func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *[]StringValue) ([]StringValue, error) { +func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) { blockType := block[0] if blockType != BlockString { return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType) @@ -593,37 +675,49 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a * return nil, err } - // Setup our timestamp and value decoders - tdec.Init(tb) - if err := vdec.SetBytes(vb); err != nil { - return nil, err - } + tdec := timeDecoderPool.Get(0).(*TimeDecoder) + vdec := stringDecoderPool.Get(0).(*StringDecoder) - // Decode both a timestamp and value - i := 0 - for tdec.Next() && vdec.Next() { - ts := tdec.Read() - v := vdec.Read() - if i < len(*a) { - elem := &(*a)[i] - elem.unixnano = ts - elem.value = v - } else { - *a = append(*a, StringValue{ts, v}) + var i int + err = func() error { + // Setup our timestamp and value decoders + tdec.Init(tb) + err = vdec.SetBytes(vb) + if err != nil { + return err } - i++ - } - // Did timestamp decoding have an error? - if tdec.Error() != nil { - return nil, tdec.Error() - } - // Did string decoding have an error? - if vdec.Error() != nil { - return nil, vdec.Error() - } + // Decode both a timestamp and value + for tdec.Next() && vdec.Next() { + ts := tdec.Read() + v := vdec.Read() + if i < len(*a) { + elem := &(*a)[i] + elem.unixnano = ts + elem.value = v + } else { + *a = append(*a, StringValue{ts, v}) + } + i++ + } - return (*a)[:i], nil + // Did timestamp decoding have an error? + err = tdec.Error() + if err != nil { + return err + } + // Did string decoding have an error? + err = vdec.Error() + if err != nil { + return err + } + return nil + }() + + timeDecoderPool.Put(tdec) + stringDecoderPool.Put(vdec) + + return (*a)[:i], err } func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte { diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index 7d6ea7170f..b4b384da1c 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -1095,7 +1095,7 @@ func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.FloatValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeFloatBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &decodedValues) + _, err = tsm1.DecodeFloatBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1164,7 +1164,7 @@ func BenchmarkDecodeBlock_Integer_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.IntegerValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeIntegerBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &decodedValues) + _, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1233,7 +1233,7 @@ func BenchmarkDecodeBlock_Boolean_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.BooleanValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeBooleanBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &decodedValues) + _, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } @@ -1302,7 +1302,7 @@ func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) { decodedValues := make([]tsm1.StringValue, len(values)) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = tsm1.DecodeStringBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &decodedValues) + _, err = tsm1.DecodeStringBlock(bytes, &decodedValues) if err != nil { b.Fatalf("unexpected error decoding block: %v", err) } diff --git a/tsdb/engine/tsm1/file_store.gen.go b/tsdb/engine/tsm1/file_store.gen.go index b30782e012..bf2ae09bd0 100644 --- a/tsdb/engine/tsm1/file_store.gen.go +++ b/tsdb/engine/tsm1/file_store.gen.go @@ -7,7 +7,7 @@ package tsm1 // ReadFloatBlock reads the next block as a set of float values. -func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[]FloatValue) ([]FloatValue, error) { +func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -16,7 +16,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadFloatBlockAt(&first.entry, buf) // Remove values we already read values = FloatValues(values).Exclude(first.readMin, first.readMax) @@ -81,7 +81,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ tombstones := cur.r.TombstoneRange(c.key) var a []FloatValue - v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -140,7 +140,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ tombstones := cur.r.TombstoneRange(c.key) var a []FloatValue - v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -167,7 +167,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[ } // ReadIntegerBlock reads the next block as a set of integer values. -func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, buf *[]IntegerValue) ([]IntegerValue, error) { +func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -176,7 +176,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadIntegerBlockAt(&first.entry, buf) // Remove values we already read values = IntegerValues(values).Exclude(first.readMin, first.readMax) @@ -241,7 +241,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []IntegerValue - v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -300,7 +300,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []IntegerValue - v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -327,7 +327,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu } // ReadStringBlock reads the next block as a set of string values. -func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf *[]StringValue) ([]StringValue, error) { +func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -336,7 +336,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadStringBlockAt(&first.entry, buf) // Remove values we already read values = StringValues(values).Exclude(first.readMin, first.readMax) @@ -401,7 +401,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf tombstones := cur.r.TombstoneRange(c.key) var a []StringValue - v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadStringBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -460,7 +460,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf tombstones := cur.r.TombstoneRange(c.key) var a []StringValue - v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadStringBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -487,7 +487,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf } // ReadBooleanBlock reads the next block as a set of boolean values. -func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, buf *[]BooleanValue) ([]BooleanValue, error) { +func (c *KeyCursor) ReadBooleanBlock(buf *[]BooleanValue) ([]BooleanValue, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -496,7 +496,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.ReadBooleanBlockAt(&first.entry, buf) // Remove values we already read values = BooleanValues(values).Exclude(first.readMin, first.readMax) @@ -561,7 +561,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []BooleanValue - v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -620,7 +620,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu tombstones := cur.r.TombstoneRange(c.key) var a []BooleanValue - v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/file_store.gen.go.tmpl b/tsdb/engine/tsm1/file_store.gen.go.tmpl index 799d05792a..164b6baac3 100644 --- a/tsdb/engine/tsm1/file_store.gen.go.tmpl +++ b/tsdb/engine/tsm1/file_store.gen.go.tmpl @@ -3,7 +3,7 @@ package tsm1 {{range .}} // Read{{.Name}}Block reads the next block as a set of {{.name}} values. -func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder, buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) { +func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) { // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -12,7 +12,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder // First block is the oldest block containing the points we're searching for. first := c.current[0] *buf = (*buf)[:0] - values, err := first.r.Read{{.Name}}BlockAt(&first.entry, tdec, vdec, buf) + values, err := first.r.Read{{.Name}}BlockAt(&first.entry, buf) // Remove values we already read values = {{.Name}}Values(values).Exclude(first.readMin, first.readMax) @@ -77,7 +77,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder tombstones := cur.r.TombstoneRange(c.key) var a []{{.Name}}Value - v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a) if err != nil { return nil, err } @@ -136,7 +136,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder tombstones := cur.r.TombstoneRange(c.key) var a []{{.Name}}Value - v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a) + v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index d3e5a10731..99a0c22d5a 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -28,10 +28,10 @@ type TSMFile interface { // ReadAt returns all the values in the block identified by entry. ReadAt(entry *IndexEntry, values []Value) ([]Value, error) - ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) - ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) - ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) - ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) + ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) + ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) + ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) + ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) // Entries returns the index entries for all blocks for the given key. Entries(key string) []IndexEntry @@ -680,14 +680,13 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error { // whether the key will be scan in ascending time order or descenging time order. // This function assumes the read-lock has been taken. func (f *FileStore) locations(key string, t int64, ascending bool) []*location { - var locations []*location - filesSnapshot := make([]TSMFile, len(f.files)) for i := range f.files { filesSnapshot[i] = f.files[i] } var entries []IndexEntry + locations := make([]*location, 0, len(filesSnapshot)) for _, fd := range filesSnapshot { minTime, maxTime := fd.TimeRange() @@ -799,13 +798,13 @@ func ParseTSMFileName(name string) (int, int, error) { id := base[:idx] - parts := strings.Split(id, "-") - if len(parts) != 2 { + idx = strings.Index(id, "-") + if idx == -1 { return 0, 0, fmt.Errorf("file %s is named incorrectly", name) } - generation, err := strconv.ParseUint(parts[0], 10, 32) - sequence, err := strconv.ParseUint(parts[1], 10, 32) + generation, err := strconv.ParseUint(id[:idx], 10, 32) + sequence, err := strconv.ParseUint(id[idx+1:], 10, 32) return int(generation), int(sequence), err } @@ -891,8 +890,9 @@ func newKeyCursor(fs *FileStore, key string, t int64, ascending bool) *KeyCursor fs: fs, seeks: fs.locations(key, t, ascending), ascending: ascending, - refs: map[string]TSMFile{}, } + c.refs = make(map[string]TSMFile, len(c.seeks)) + c.duplicates = c.hasOverlappingBlocks() if ascending { @@ -1025,7 +1025,7 @@ func (c *KeyCursor) nextAscending() { // Append the first matching block if len(c.current) == 0 { - c.current = append(c.current, &location{}) + c.current = append(c.current, nil) } else { c.current = c.current[:1] } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index b96c094852..95dc3d7587 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -70,7 +70,7 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -110,7 +110,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -130,7 +130,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -149,7 +149,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -178,7 +178,7 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -220,7 +220,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -242,7 +242,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -287,7 +287,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -308,7 +308,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -353,7 +353,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -374,7 +374,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -419,7 +419,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -440,7 +440,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -485,7 +485,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -509,7 +509,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -529,7 +529,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -561,7 +561,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -585,7 +585,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -604,7 +604,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -636,7 +636,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -660,7 +660,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -679,7 +679,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -711,7 +711,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) // Search for an entry that exists in the second file - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -735,7 +735,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { // Check that calling Next will dedupe points c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -754,7 +754,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) exp = nil if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) @@ -784,7 +784,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 3, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -801,7 +801,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -840,7 +840,7 @@ func TestFileStore_SeekToAsc_End(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -879,7 +879,7 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -918,7 +918,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -936,7 +936,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -977,7 +977,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -998,7 +998,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1042,7 +1042,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1063,7 +1063,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1105,7 +1105,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1126,7 +1126,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1168,7 +1168,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 5, false) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1189,7 +1189,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1229,7 +1229,7 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 4, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1268,7 +1268,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 10, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1289,7 +1289,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1311,7 +1311,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1332,7 +1332,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1365,7 +1365,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1388,7 +1388,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1409,7 +1409,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { } c.Next() - values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err = c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1442,7 +1442,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1463,7 +1463,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1485,7 +1485,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1506,7 +1506,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { } c.Next() - values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err = c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1539,7 +1539,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 11, false) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1560,7 +1560,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1582,7 +1582,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1603,7 +1603,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { } c.Next() - values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err = c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1639,7 +1639,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { // Search for an entry that exists in the second file buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 3, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1659,7 +1659,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { } } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1680,7 +1680,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) { } c.Next() - values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err = c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) @@ -1712,7 +1712,7 @@ func TestFileStore_SeekToDesc_End(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 2, false) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1756,7 +1756,7 @@ func TestKeyCursor_TombstoneRange(t *testing.T) { c := fs.KeyCursor("cpu", 0, true) expValues := []int{0, 2} for _, v := range expValues { - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1799,7 +1799,7 @@ func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) { buf := make([]tsm1.FloatValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + values, err := c.ReadFloatBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1843,7 +1843,7 @@ func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) { buf := make([]tsm1.IntegerValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + values, err := c.ReadIntegerBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1887,7 +1887,7 @@ func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) { buf := make([]tsm1.StringValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + values, err := c.ReadStringBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -1931,7 +1931,7 @@ func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) { buf := make([]tsm1.BooleanValue, 1000) c := fs.KeyCursor("cpu", 0, true) - values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + values, err := c.ReadBooleanBlock(&buf) if err != nil { t.Fatalf("unexpected error reading values: %v", err) } @@ -2064,23 +2064,21 @@ func TestFileStore_Replace(t *testing.T) { } // There should be two blocks (1 in each file) - tdec := &tsm1.TimeDecoder{} - vdec := &tsm1.FloatDecoder{} cur.Next() buf := make([]tsm1.FloatValue, 10) - values, err := cur.ReadFloatBlock(tdec, vdec, &buf) + values, err := cur.ReadFloatBlock(&buf) if got, exp := len(values), 1; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } cur.Next() - values, err = cur.ReadFloatBlock(tdec, vdec, &buf) + values, err = cur.ReadFloatBlock(&buf) if got, exp := len(values), 1; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } // No more blocks for this cursor cur.Next() - values, err = cur.ReadFloatBlock(tdec, vdec, &buf) + values, err = cur.ReadFloatBlock(&buf) if got, exp := len(values), 0; got != exp { t.Fatalf("value len mismatch: got %v, exp %v", got, exp) } diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index 4a19c3e916..762a046799 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -311,8 +311,6 @@ type floatAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec FloatDecoder buf []FloatValue values []FloatValue pos int @@ -330,7 +328,7 @@ func newFloatAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCu c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]FloatValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -412,7 +410,7 @@ func (c *floatAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -427,8 +425,6 @@ type floatDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec FloatDecoder buf []FloatValue values []FloatValue pos int @@ -449,7 +445,7 @@ func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]FloatValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -534,7 +530,7 @@ func (c *floatDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -758,8 +754,6 @@ type integerAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec IntegerDecoder buf []IntegerValue values []IntegerValue pos int @@ -777,7 +771,7 @@ func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]IntegerValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -859,7 +853,7 @@ func (c *integerAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -874,8 +868,6 @@ type integerDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec IntegerDecoder buf []IntegerValue values []IntegerValue pos int @@ -896,7 +888,7 @@ func newIntegerDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]IntegerValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -981,7 +973,7 @@ func (c *integerDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1205,8 +1197,6 @@ type stringAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec StringDecoder buf []StringValue values []StringValue pos int @@ -1224,7 +1214,7 @@ func newStringAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]StringValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1306,7 +1296,7 @@ func (c *stringAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1321,8 +1311,6 @@ type stringDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec StringDecoder buf []StringValue values []StringValue pos int @@ -1343,7 +1331,7 @@ func newStringDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]StringValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1428,7 +1416,7 @@ func (c *stringDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1652,8 +1640,6 @@ type booleanAscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec BooleanDecoder buf []BooleanValue values []BooleanValue pos int @@ -1671,7 +1657,7 @@ func newBooleanAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]BooleanValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1753,7 +1739,7 @@ func (c *booleanAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -1768,8 +1754,6 @@ type booleanDescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec BooleanDecoder buf []BooleanValue values []BooleanValue pos int @@ -1790,7 +1774,7 @@ func newBooleanDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]BooleanValue, 10) - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -1875,7 +1859,7 @@ func (c *booleanDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf) if len(c.tsm.values) == 0 { return } diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpl b/tsdb/engine/tsm1/iterator.gen.go.tmpl index 92b22a405b..2e1f5c0c20 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpl @@ -307,8 +307,6 @@ type {{.name}}AscendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec {{.Name}}Decoder buf []{{.Name}}Value values []{{.Name}}Value pos int @@ -326,7 +324,7 @@ func new{{.Name}}AscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *K c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]{{.Name}}Value, 10) - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -408,7 +406,7 @@ func (c *{{.name}}AscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) if len(c.tsm.values) == 0 { return } @@ -423,8 +421,6 @@ type {{.name}}DescendingCursor struct { } tsm struct { - tdec TimeDecoder - vdec {{.Name}}Decoder buf []{{.Name}}Value values []{{.Name}}Value pos int @@ -445,7 +441,7 @@ func new{{.Name}}DescendingCursor(seek int64, cacheValues Values, tsmKeyCursor * c.tsm.keyCursor = tsmKeyCursor c.tsm.buf = make([]{{.Name}}Value, 10) - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) @@ -530,7 +526,7 @@ func (c *{{.name}}DescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf) + c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf) if len(c.tsm.values) == 0 { return } diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 9186c20024..daaa516b42 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -163,10 +163,10 @@ type blockAccessor interface { read(key string, timestamp int64) ([]Value, error) readAll(key string) ([]Value, error) readBlock(entry *IndexEntry, values []Value) ([]Value, error) - readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, fdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) - readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) - readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) - readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) + readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) + readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) + readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) + readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) rename(path string) error path() string @@ -254,30 +254,30 @@ func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { return v, err } -func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, vals *[]FloatValue) ([]FloatValue, error) { +func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) { t.mu.RLock() - v, err := t.accessor.readFloatBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readFloatBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, vals *[]IntegerValue) ([]IntegerValue, error) { +func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) { t.mu.RLock() - v, err := t.accessor.readIntegerBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readIntegerBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, vals *[]StringValue) ([]StringValue, error) { +func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) { t.mu.RLock() - v, err := t.accessor.readStringBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readStringBlock(entry, vals) t.mu.RUnlock() return v, err } -func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, vals *[]BooleanValue) ([]BooleanValue, error) { +func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) { t.mu.RLock() - v, err := t.accessor.readBooleanBlock(entry, tdec, vdec, vals) + v, err := t.accessor.readBooleanBlock(entry, vals) t.mu.RUnlock() return v, err } @@ -1041,7 +1041,7 @@ func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, er return values, nil } -func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) { +func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1049,7 +1049,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec return nil, ErrTSMClosed } - a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1059,7 +1059,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec return a, nil } -func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) { +func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1067,7 +1067,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd return nil, ErrTSMClosed } - a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1077,7 +1077,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd return a, nil } -func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) { +func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1085,7 +1085,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde return nil, ErrTSMClosed } - a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil { @@ -1095,7 +1095,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde return a, nil } -func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) { +func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) { m.mu.RLock() if int64(len(m.b)) < entry.Offset+int64(entry.Size) { @@ -1103,7 +1103,7 @@ func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vd return nil, ErrTSMClosed } - a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) + a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) m.mu.RUnlock() if err != nil {