commit
aeb84b3737
|
@ -349,9 +349,65 @@ func (c *Cache) Keys() []string {
|
|||
|
||||
// Values returns a copy of all values, deduped and sorted, for the given key.
|
||||
func (c *Cache) Values(key string) Values {
|
||||
var snapshotEntries *entry
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.merged(key)
|
||||
e := c.store[key]
|
||||
if c.snapshot != nil {
|
||||
snapshotEntries = c.snapshot.store[key]
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e == nil {
|
||||
if snapshotEntries == nil {
|
||||
// No values in hot cache or snapshots.
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
e.deduplicate()
|
||||
}
|
||||
|
||||
// Build the sequence of entries that will be returned, in the correct order.
|
||||
// Calculate the required size of the destination buffer.
|
||||
var entries []*entry
|
||||
sz := 0
|
||||
|
||||
if snapshotEntries != nil {
|
||||
snapshotEntries.deduplicate() // guarantee we are deduplicated
|
||||
entries = append(entries, snapshotEntries)
|
||||
sz += snapshotEntries.count()
|
||||
}
|
||||
|
||||
if e != nil {
|
||||
entries = append(entries, e)
|
||||
sz += e.count()
|
||||
}
|
||||
|
||||
// Any entries? If not, return.
|
||||
if sz == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create the buffer, and copy all hot values and snapshots. Individual
|
||||
// entries are sorted at this point, so now the code has to check if the
|
||||
// resultant buffer will be sorted from start to finish.
|
||||
var needSort bool
|
||||
values := make(Values, sz)
|
||||
n := 0
|
||||
for _, e := range entries {
|
||||
e.mu.RLock()
|
||||
if !needSort && n > 0 && len(e.values) > 0 {
|
||||
needSort = values[n-1].UnixNano() >= e.values[0].UnixNano()
|
||||
}
|
||||
n += copy(values[n:], e.values)
|
||||
e.mu.RUnlock()
|
||||
}
|
||||
values = values[:n]
|
||||
|
||||
if needSort {
|
||||
values = values.Deduplicate()
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// Delete will remove the keys from the cache
|
||||
|
@ -397,68 +453,6 @@ func (c *Cache) SetMaxSize(size uint64) {
|
|||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// merged returns a copy of hot and snapshot values. The copy will be merged, deduped, and
|
||||
// sorted. It assumes all necessary locks have been taken. If the caller knows that the
|
||||
// the hot source data for the key will not be changed, it is safe to call this function
|
||||
// with a read-lock taken. Otherwise it must be called with a write-lock taken.
|
||||
func (c *Cache) merged(key string) Values {
|
||||
e := c.store[key]
|
||||
if e == nil {
|
||||
if c.snapshot == nil {
|
||||
// No values in hot cache or snapshots.
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
e.deduplicate()
|
||||
}
|
||||
|
||||
// Build the sequence of entries that will be returned, in the correct order.
|
||||
// Calculate the required size of the destination buffer.
|
||||
var entries []*entry
|
||||
sz := 0
|
||||
|
||||
if c.snapshot != nil {
|
||||
snapshotEntries := c.snapshot.store[key]
|
||||
if snapshotEntries != nil {
|
||||
snapshotEntries.deduplicate() // guarantee we are deduplicated
|
||||
entries = append(entries, snapshotEntries)
|
||||
sz += snapshotEntries.count()
|
||||
}
|
||||
}
|
||||
|
||||
if e != nil {
|
||||
entries = append(entries, e)
|
||||
sz += e.count()
|
||||
}
|
||||
|
||||
// Any entries? If not, return.
|
||||
if sz == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create the buffer, and copy all hot values and snapshots. Individual
|
||||
// entries are sorted at this point, so now the code has to check if the
|
||||
// resultant buffer will be sorted from start to finish.
|
||||
var needSort bool
|
||||
values := make(Values, sz)
|
||||
n := 0
|
||||
for _, e := range entries {
|
||||
e.mu.RLock()
|
||||
if !needSort && n > 0 && len(e.values) > 0 {
|
||||
needSort = values[n-1].UnixNano() >= e.values[0].UnixNano()
|
||||
}
|
||||
n += copy(values[n:], e.values)
|
||||
e.mu.RUnlock()
|
||||
}
|
||||
values = values[:n]
|
||||
|
||||
if needSort {
|
||||
values = values.Deduplicate()
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// Store returns the underlying cache store. This is not goroutine safe!
|
||||
// Protect access by using the Lock and Unlock functions on Cache.
|
||||
func (c *Cache) Store() map[string]*entry {
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
|
@ -28,22 +29,63 @@ const (
|
|||
encodedBlockHeaderSize = 1
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Prime the pools with with at one encoder/decoder for each available CPU
|
||||
vals := make([]interface{}, 0, runtime.NumCPU())
|
||||
for _, p := range []*pool.Generic{
|
||||
timeEncoderPool, timeDecoderPool,
|
||||
integerEncoderPool, integerDecoderPool,
|
||||
floatDecoderPool, floatDecoderPool,
|
||||
stringEncoderPool, stringEncoderPool,
|
||||
booleanEncoderPool, booleanDecoderPool,
|
||||
} {
|
||||
vals = vals[:0]
|
||||
// Check one out to force the allocation now and hold onto it
|
||||
for i := 0; i < runtime.NumCPU(); i++ {
|
||||
v := p.Get(tsdb.DefaultMaxPointsPerBlock)
|
||||
vals = append(vals, v)
|
||||
}
|
||||
// Add them all back
|
||||
for _, v := range vals {
|
||||
p.Put(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
timeEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
|
||||
// encoder pools
|
||||
timeEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return NewTimeEncoder(sz)
|
||||
})
|
||||
integerEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
|
||||
integerEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return NewIntegerEncoder(sz)
|
||||
})
|
||||
floatEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
|
||||
floatEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return NewFloatEncoder()
|
||||
})
|
||||
stringEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
|
||||
stringEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return NewStringEncoder(sz)
|
||||
})
|
||||
booleanEncoderPool = pool.NewGeneric(1024, func(sz int) interface{} {
|
||||
booleanEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return NewBooleanEncoder(sz)
|
||||
})
|
||||
|
||||
// decoder pools
|
||||
timeDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return &TimeDecoder{}
|
||||
})
|
||||
integerDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return &IntegerDecoder{}
|
||||
})
|
||||
floatDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return &FloatDecoder{}
|
||||
})
|
||||
stringDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return &StringDecoder{}
|
||||
})
|
||||
booleanDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
|
||||
return &BooleanDecoder{}
|
||||
})
|
||||
)
|
||||
|
||||
type Value interface {
|
||||
|
@ -163,7 +205,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
|||
switch blockType {
|
||||
case BlockFloat64:
|
||||
var buf []FloatValue
|
||||
decoded, err := DecodeFloatBlock(block, &TimeDecoder{}, &FloatDecoder{}, &buf)
|
||||
decoded, err := DecodeFloatBlock(block, &buf)
|
||||
if len(vals) < len(decoded) {
|
||||
vals = make([]Value, len(decoded))
|
||||
}
|
||||
|
@ -173,7 +215,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
|||
return vals[:len(decoded)], err
|
||||
case BlockInteger:
|
||||
var buf []IntegerValue
|
||||
decoded, err := DecodeIntegerBlock(block, &TimeDecoder{}, &IntegerDecoder{}, &buf)
|
||||
decoded, err := DecodeIntegerBlock(block, &buf)
|
||||
if len(vals) < len(decoded) {
|
||||
vals = make([]Value, len(decoded))
|
||||
}
|
||||
|
@ -184,7 +226,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
|||
|
||||
case BlockBoolean:
|
||||
var buf []BooleanValue
|
||||
decoded, err := DecodeBooleanBlock(block, &TimeDecoder{}, &BooleanDecoder{}, &buf)
|
||||
decoded, err := DecodeBooleanBlock(block, &buf)
|
||||
if len(vals) < len(decoded) {
|
||||
vals = make([]Value, len(decoded))
|
||||
}
|
||||
|
@ -195,7 +237,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
|||
|
||||
case BlockString:
|
||||
var buf []StringValue
|
||||
decoded, err := DecodeStringBlock(block, &TimeDecoder{}, &StringDecoder{}, &buf)
|
||||
decoded, err := DecodeStringBlock(block, &buf)
|
||||
if len(vals) < len(decoded) {
|
||||
vals = make([]Value, len(decoded))
|
||||
}
|
||||
|
@ -245,38 +287,39 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
|
|||
// frame-or-reference and run length encoding.
|
||||
tsenc := getTimeEncoder(len(values))
|
||||
|
||||
for _, v := range values {
|
||||
tsenc.Write(v.UnixNano())
|
||||
venc.Push(v.(*FloatValue).value)
|
||||
}
|
||||
venc.Finish()
|
||||
var b []byte
|
||||
err := func() error {
|
||||
for _, v := range values {
|
||||
tsenc.Write(v.UnixNano())
|
||||
venc.Push(v.(*FloatValue).value)
|
||||
}
|
||||
venc.Finish()
|
||||
|
||||
var err error
|
||||
var b, tb, vb []byte
|
||||
// Encoded timestamp values
|
||||
tb, err := tsenc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Encoded float values
|
||||
vb, err := venc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err = tsenc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Encoded float values
|
||||
vb, err = venc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||
// in the next byte, followed by the block
|
||||
b = packBlock(buf, BlockFloat64, tb, vb)
|
||||
|
||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||
// in the next byte, followed by the block
|
||||
b = packBlock(buf, BlockFloat64, tb, vb)
|
||||
return nil
|
||||
}()
|
||||
|
||||
cleanup:
|
||||
putTimeEncoder(tsenc)
|
||||
putFloatEncoder(venc)
|
||||
return b, err
|
||||
|
||||
return b, err
|
||||
}
|
||||
|
||||
func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]FloatValue) ([]FloatValue, error) {
|
||||
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
|
||||
// Block type is the next block, make sure we actually have a float block
|
||||
blockType := block[0]
|
||||
if blockType != BlockFloat64 {
|
||||
|
@ -289,37 +332,50 @@ func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
if err := vdec.SetBytes(vb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
|
||||
vdec := floatDecoderPool.Get(0).(*FloatDecoder)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
i := 0
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Values()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, FloatValue{ts, v})
|
||||
var i int
|
||||
err = func() error {
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
err = vdec.SetBytes(vb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Did timestamp decoding have an error?
|
||||
if tdec.Error() != nil {
|
||||
return nil, tdec.Error()
|
||||
}
|
||||
// Did float decoding have an error?
|
||||
if vdec.Error() != nil {
|
||||
return nil, vdec.Error()
|
||||
}
|
||||
// Decode both a timestamp and value
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Values()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, FloatValue{ts, v})
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
return (*a)[:i], nil
|
||||
// Did timestamp decoding have an error?
|
||||
err = tdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Did float decoding have an error?
|
||||
err = vdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
timeDecoderPool.Put(tdec)
|
||||
floatDecoderPool.Put(vdec)
|
||||
|
||||
return (*a)[:i], err
|
||||
}
|
||||
|
||||
type BooleanValue struct {
|
||||
|
@ -355,36 +411,37 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
|
|||
// Encode timestamps using an adaptive encoder
|
||||
tsenc := getTimeEncoder(len(values))
|
||||
|
||||
for _, v := range values {
|
||||
tsenc.Write(v.UnixNano())
|
||||
venc.Write(v.(*BooleanValue).value)
|
||||
}
|
||||
var b []byte
|
||||
err := func() error {
|
||||
for _, v := range values {
|
||||
tsenc.Write(v.UnixNano())
|
||||
venc.Write(v.(*BooleanValue).value)
|
||||
}
|
||||
|
||||
var err error
|
||||
var b, tb, vb []byte
|
||||
// Encoded timestamp values
|
||||
tb, err := tsenc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Encoded float values
|
||||
vb, err := venc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err = tsenc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Encoded float values
|
||||
vb, err = venc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||
// in the next byte, followed by the block
|
||||
b = packBlock(buf, BlockBoolean, tb, vb)
|
||||
return nil
|
||||
}()
|
||||
|
||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||
// in the next byte, followed by the block
|
||||
b = packBlock(buf, BlockBoolean, tb, vb)
|
||||
|
||||
cleanup:
|
||||
putTimeEncoder(tsenc)
|
||||
putBooleanEncoder(venc)
|
||||
|
||||
return b, err
|
||||
}
|
||||
|
||||
func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a *[]BooleanValue) ([]BooleanValue, error) {
|
||||
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
|
||||
// Block type is the next block, make sure we actually have a float block
|
||||
blockType := block[0]
|
||||
if blockType != BlockBoolean {
|
||||
|
@ -397,35 +454,46 @@ func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
vdec.SetBytes(vb)
|
||||
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
|
||||
vdec := booleanDecoderPool.Get(0).(*BooleanDecoder)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
i := 0
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, BooleanValue{ts, v})
|
||||
var i int
|
||||
err = func() error {
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
vdec.SetBytes(vb)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, BooleanValue{ts, v})
|
||||
}
|
||||
i++
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Did timestamp decoding have an error?
|
||||
if tdec.Error() != nil {
|
||||
return nil, tdec.Error()
|
||||
}
|
||||
// Did boolean decoding have an error?
|
||||
if vdec.Error() != nil {
|
||||
return nil, vdec.Error()
|
||||
}
|
||||
// Did timestamp decoding have an error?
|
||||
err = tdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Did boolean decoding have an error?
|
||||
err = vdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
return (*a)[:i], nil
|
||||
timeDecoderPool.Put(tdec)
|
||||
booleanDecoderPool.Put(vdec)
|
||||
|
||||
return (*a)[:i], err
|
||||
}
|
||||
|
||||
type IntegerValue struct {
|
||||
|
@ -453,35 +521,36 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
|
|||
tsEnc := getTimeEncoder(len(values))
|
||||
vEnc := getIntegerEncoder(len(values))
|
||||
|
||||
for _, v := range values {
|
||||
tsEnc.Write(v.UnixNano())
|
||||
vEnc.Write(v.(*IntegerValue).value)
|
||||
}
|
||||
var b []byte
|
||||
err := func() error {
|
||||
for _, v := range values {
|
||||
tsEnc.Write(v.UnixNano())
|
||||
vEnc.Write(v.(*IntegerValue).value)
|
||||
}
|
||||
|
||||
var err error
|
||||
var b, tb, vb []byte
|
||||
// Encoded timestamp values
|
||||
tb, err := tsEnc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Encoded int64 values
|
||||
vb, err := vEnc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err = tsEnc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Encoded int64 values
|
||||
vb, err = vEnc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Prepend the first timestamp of the block in the first 8 bytes
|
||||
b = packBlock(buf, BlockInteger, tb, vb)
|
||||
return nil
|
||||
}()
|
||||
|
||||
// Prepend the first timestamp of the block in the first 8 bytes
|
||||
b = packBlock(buf, BlockInteger, tb, vb)
|
||||
|
||||
cleanup:
|
||||
putTimeEncoder(tsEnc)
|
||||
putIntegerEncoder(vEnc)
|
||||
|
||||
return b, err
|
||||
}
|
||||
|
||||
func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a *[]IntegerValue) ([]IntegerValue, error) {
|
||||
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
|
||||
blockType := block[0]
|
||||
if blockType != BlockInteger {
|
||||
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInteger, blockType)
|
||||
|
@ -495,35 +564,46 @@ func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
vdec.SetBytes(vb)
|
||||
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
|
||||
vdec := integerDecoderPool.Get(0).(*IntegerDecoder)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
i := 0
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, IntegerValue{ts, v})
|
||||
var i int
|
||||
err = func() error {
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
vdec.SetBytes(vb)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, IntegerValue{ts, v})
|
||||
}
|
||||
i++
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Did timestamp decoding have an error?
|
||||
if tdec.Error() != nil {
|
||||
return nil, tdec.Error()
|
||||
}
|
||||
// Did int64 decoding have an error?
|
||||
if vdec.Error() != nil {
|
||||
return nil, vdec.Error()
|
||||
}
|
||||
// Did timestamp decoding have an error?
|
||||
err = tdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Did int64 decoding have an error?
|
||||
err = vdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
return (*a)[:i], nil
|
||||
timeDecoderPool.Put(tdec)
|
||||
integerDecoderPool.Put(vdec)
|
||||
|
||||
return (*a)[:i], err
|
||||
}
|
||||
|
||||
type StringValue struct {
|
||||
|
@ -551,35 +631,37 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
|
|||
tsEnc := getTimeEncoder(len(values))
|
||||
vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value))
|
||||
|
||||
for _, v := range values {
|
||||
tsEnc.Write(v.UnixNano())
|
||||
vEnc.Write(v.(*StringValue).value)
|
||||
}
|
||||
var b []byte
|
||||
err := func() error {
|
||||
for _, v := range values {
|
||||
tsEnc.Write(v.UnixNano())
|
||||
vEnc.Write(v.(*StringValue).value)
|
||||
}
|
||||
|
||||
var err error
|
||||
var b, tb, vb []byte
|
||||
// Encoded timestamp values
|
||||
tb, err := tsEnc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Encoded string values
|
||||
vb, err := vEnc.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err = tsEnc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Encoded string values
|
||||
vb, err = vEnc.Bytes()
|
||||
if err != nil {
|
||||
goto cleanup
|
||||
}
|
||||
// Prepend the first timestamp of the block in the first 8 bytes
|
||||
b = packBlock(buf, BlockString, tb, vb)
|
||||
|
||||
// Prepend the first timestamp of the block in the first 8 bytes
|
||||
b = packBlock(buf, BlockString, tb, vb)
|
||||
return nil
|
||||
}()
|
||||
|
||||
cleanup:
|
||||
putTimeEncoder(tsEnc)
|
||||
putStringEncoder(vEnc)
|
||||
|
||||
return b, err
|
||||
}
|
||||
|
||||
func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *[]StringValue) ([]StringValue, error) {
|
||||
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {
|
||||
blockType := block[0]
|
||||
if blockType != BlockString {
|
||||
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType)
|
||||
|
@ -593,37 +675,49 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
if err := vdec.SetBytes(vb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
|
||||
vdec := stringDecoderPool.Get(0).(*StringDecoder)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
i := 0
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, StringValue{ts, v})
|
||||
var i int
|
||||
err = func() error {
|
||||
// Setup our timestamp and value decoders
|
||||
tdec.Init(tb)
|
||||
err = vdec.SetBytes(vb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Did timestamp decoding have an error?
|
||||
if tdec.Error() != nil {
|
||||
return nil, tdec.Error()
|
||||
}
|
||||
// Did string decoding have an error?
|
||||
if vdec.Error() != nil {
|
||||
return nil, vdec.Error()
|
||||
}
|
||||
// Decode both a timestamp and value
|
||||
for tdec.Next() && vdec.Next() {
|
||||
ts := tdec.Read()
|
||||
v := vdec.Read()
|
||||
if i < len(*a) {
|
||||
elem := &(*a)[i]
|
||||
elem.unixnano = ts
|
||||
elem.value = v
|
||||
} else {
|
||||
*a = append(*a, StringValue{ts, v})
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
return (*a)[:i], nil
|
||||
// Did timestamp decoding have an error?
|
||||
err = tdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Did string decoding have an error?
|
||||
err = vdec.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
timeDecoderPool.Put(tdec)
|
||||
stringDecoderPool.Put(vdec)
|
||||
|
||||
return (*a)[:i], err
|
||||
}
|
||||
|
||||
func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte {
|
||||
|
|
|
@ -1095,7 +1095,7 @@ func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) {
|
|||
decodedValues := make([]tsm1.FloatValue, len(values))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = tsm1.DecodeFloatBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &decodedValues)
|
||||
_, err = tsm1.DecodeFloatBlock(bytes, &decodedValues)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error decoding block: %v", err)
|
||||
}
|
||||
|
@ -1164,7 +1164,7 @@ func BenchmarkDecodeBlock_Integer_TypeSpecific(b *testing.B) {
|
|||
decodedValues := make([]tsm1.IntegerValue, len(values))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = tsm1.DecodeIntegerBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &decodedValues)
|
||||
_, err = tsm1.DecodeIntegerBlock(bytes, &decodedValues)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error decoding block: %v", err)
|
||||
}
|
||||
|
@ -1233,7 +1233,7 @@ func BenchmarkDecodeBlock_Boolean_TypeSpecific(b *testing.B) {
|
|||
decodedValues := make([]tsm1.BooleanValue, len(values))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = tsm1.DecodeBooleanBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &decodedValues)
|
||||
_, err = tsm1.DecodeBooleanBlock(bytes, &decodedValues)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error decoding block: %v", err)
|
||||
}
|
||||
|
@ -1302,7 +1302,7 @@ func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) {
|
|||
decodedValues := make([]tsm1.StringValue, len(values))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = tsm1.DecodeStringBlock(bytes, &tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &decodedValues)
|
||||
_, err = tsm1.DecodeStringBlock(bytes, &decodedValues)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error decoding block: %v", err)
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
package tsm1
|
||||
|
||||
// ReadFloatBlock reads the next block as a set of float values.
|
||||
func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[]FloatValue) ([]FloatValue, error) {
|
||||
func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
|
@ -16,7 +16,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[
|
|||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, vdec, buf)
|
||||
values, err := first.r.ReadFloatBlockAt(&first.entry, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = FloatValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
@ -81,7 +81,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[
|
|||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[
|
|||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[
|
|||
}
|
||||
|
||||
// ReadIntegerBlock reads the next block as a set of integer values.
|
||||
func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, buf *[]IntegerValue) ([]IntegerValue, error) {
|
||||
func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
|
@ -176,7 +176,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf)
|
||||
values, err := first.r.ReadIntegerBlockAt(&first.entry, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = IntegerValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
@ -241,7 +241,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -327,7 +327,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
}
|
||||
|
||||
// ReadStringBlock reads the next block as a set of string values.
|
||||
func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf *[]StringValue) ([]StringValue, error) {
|
||||
func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
|
@ -336,7 +336,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf)
|
||||
values, err := first.r.ReadStringBlockAt(&first.entry, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = StringValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
@ -401,7 +401,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -460,7 +460,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -487,7 +487,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
}
|
||||
|
||||
// ReadBooleanBlock reads the next block as a set of boolean values.
|
||||
func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, buf *[]BooleanValue) ([]BooleanValue, error) {
|
||||
func (c *KeyCursor) ReadBooleanBlock(buf *[]BooleanValue) ([]BooleanValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
|
@ -496,7 +496,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf)
|
||||
values, err := first.r.ReadBooleanBlockAt(&first.entry, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = BooleanValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
@ -561,7 +561,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -620,7 +620,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package tsm1
|
|||
|
||||
{{range .}}
|
||||
// Read{{.Name}}Block reads the next block as a set of {{.name}} values.
|
||||
func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder, buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
||||
func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
|
@ -12,7 +12,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder
|
|||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.Read{{.Name}}BlockAt(&first.entry, tdec, vdec, buf)
|
||||
values, err := first.r.Read{{.Name}}BlockAt(&first.entry, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = {{.Name}}Values(values).Exclude(first.readMin, first.readMax)
|
||||
|
@ -77,7 +77,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder
|
|||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []{{.Name}}Value
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder
|
|||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []{{.Name}}Value
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a)
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -28,10 +28,10 @@ type TSMFile interface {
|
|||
|
||||
// ReadAt returns all the values in the block identified by entry.
|
||||
ReadAt(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error)
|
||||
ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error)
|
||||
ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
|
||||
ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
|
||||
ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
|
||||
// Entries returns the index entries for all blocks for the given key.
|
||||
Entries(key string) []IndexEntry
|
||||
|
@ -680,14 +680,13 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
|
|||
// whether the key will be scan in ascending time order or descenging time order.
|
||||
// This function assumes the read-lock has been taken.
|
||||
func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
|
||||
var locations []*location
|
||||
|
||||
filesSnapshot := make([]TSMFile, len(f.files))
|
||||
for i := range f.files {
|
||||
filesSnapshot[i] = f.files[i]
|
||||
}
|
||||
|
||||
var entries []IndexEntry
|
||||
locations := make([]*location, 0, len(filesSnapshot))
|
||||
for _, fd := range filesSnapshot {
|
||||
minTime, maxTime := fd.TimeRange()
|
||||
|
||||
|
@ -799,13 +798,13 @@ func ParseTSMFileName(name string) (int, int, error) {
|
|||
|
||||
id := base[:idx]
|
||||
|
||||
parts := strings.Split(id, "-")
|
||||
if len(parts) != 2 {
|
||||
idx = strings.Index(id, "-")
|
||||
if idx == -1 {
|
||||
return 0, 0, fmt.Errorf("file %s is named incorrectly", name)
|
||||
}
|
||||
|
||||
generation, err := strconv.ParseUint(parts[0], 10, 32)
|
||||
sequence, err := strconv.ParseUint(parts[1], 10, 32)
|
||||
generation, err := strconv.ParseUint(id[:idx], 10, 32)
|
||||
sequence, err := strconv.ParseUint(id[idx+1:], 10, 32)
|
||||
|
||||
return int(generation), int(sequence), err
|
||||
}
|
||||
|
@ -891,8 +890,9 @@ func newKeyCursor(fs *FileStore, key string, t int64, ascending bool) *KeyCursor
|
|||
fs: fs,
|
||||
seeks: fs.locations(key, t, ascending),
|
||||
ascending: ascending,
|
||||
refs: map[string]TSMFile{},
|
||||
}
|
||||
c.refs = make(map[string]TSMFile, len(c.seeks))
|
||||
|
||||
c.duplicates = c.hasOverlappingBlocks()
|
||||
|
||||
if ascending {
|
||||
|
@ -1025,7 +1025,7 @@ func (c *KeyCursor) nextAscending() {
|
|||
|
||||
// Append the first matching block
|
||||
if len(c.current) == 0 {
|
||||
c.current = append(c.current, &location{})
|
||||
c.current = append(c.current, nil)
|
||||
} else {
|
||||
c.current = c.current[:1]
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
|
|||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
|
|||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
|
|||
|
||||
// Check that calling Next will dedupe points
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
exp = nil
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -178,7 +178,7 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -220,7 +220,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -242,7 +242,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -287,7 +287,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err := c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -308,7 +308,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -353,7 +353,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err := c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -374,7 +374,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -419,7 +419,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err := c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -440,7 +440,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -485,7 +485,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) {
|
|||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -509,7 +509,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) {
|
|||
|
||||
// Check that calling Next will dedupe points
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -529,7 +529,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
exp = nil
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -561,7 +561,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
|
|||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err := c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
|
|||
|
||||
// Check that calling Next will dedupe points
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -604,7 +604,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
exp = nil
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -636,7 +636,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
|
|||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err := c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -660,7 +660,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
|
|||
|
||||
// Check that calling Next will dedupe points
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -679,7 +679,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
exp = nil
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -711,7 +711,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) {
|
|||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
// Search for an entry that exists in the second file
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err := c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -735,7 +735,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) {
|
|||
|
||||
// Check that calling Next will dedupe points
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -754,7 +754,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
exp = nil
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -784,7 +784,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 3, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -801,7 +801,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -840,7 +840,7 @@ func TestFileStore_SeekToAsc_End(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 2, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -879,7 +879,7 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -918,7 +918,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 2, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -936,7 +936,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -977,7 +977,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 5, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -998,7 +998,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1042,7 +1042,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 5, false)
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err := c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1063,7 +1063,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1105,7 +1105,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 5, false)
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err := c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1126,7 +1126,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1168,7 +1168,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 5, false)
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err := c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1189,7 +1189,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1229,7 +1229,7 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 4, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1268,7 +1268,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 10, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1289,7 +1289,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1311,7 +1311,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1332,7 +1332,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1365,7 +1365,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 11, false)
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err := c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1388,7 +1388,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1409,7 +1409,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err = c.ReadIntegerBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1442,7 +1442,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 11, false)
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err := c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1463,7 +1463,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1485,7 +1485,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1506,7 +1506,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err = c.ReadBooleanBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1539,7 +1539,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 11, false)
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err := c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1560,7 +1560,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1582,7 +1582,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1603,7 +1603,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err = c.ReadStringBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1639,7 +1639,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
|
|||
// Search for an entry that exists in the second file
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 3, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1659,7 +1659,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
|
|||
}
|
||||
}
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1680,7 +1680,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
|
|||
}
|
||||
|
||||
c.Next()
|
||||
values, err = c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err = c.ReadFloatBlock(&buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
|
@ -1712,7 +1712,7 @@ func TestFileStore_SeekToDesc_End(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 2, false)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1756,7 +1756,7 @@ func TestKeyCursor_TombstoneRange(t *testing.T) {
|
|||
c := fs.KeyCursor("cpu", 0, true)
|
||||
expValues := []int{0, 2}
|
||||
for _, v := range expValues {
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1799,7 +1799,7 @@ func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
values, err := c.ReadFloatBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1843,7 +1843,7 @@ func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
values, err := c.ReadIntegerBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1887,7 +1887,7 @@ func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
values, err := c.ReadStringBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -1931,7 +1931,7 @@ func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) {
|
|||
|
||||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
values, err := c.ReadBooleanBlock(&buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
@ -2064,23 +2064,21 @@ func TestFileStore_Replace(t *testing.T) {
|
|||
}
|
||||
|
||||
// There should be two blocks (1 in each file)
|
||||
tdec := &tsm1.TimeDecoder{}
|
||||
vdec := &tsm1.FloatDecoder{}
|
||||
cur.Next()
|
||||
buf := make([]tsm1.FloatValue, 10)
|
||||
values, err := cur.ReadFloatBlock(tdec, vdec, &buf)
|
||||
values, err := cur.ReadFloatBlock(&buf)
|
||||
if got, exp := len(values), 1; got != exp {
|
||||
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
cur.Next()
|
||||
values, err = cur.ReadFloatBlock(tdec, vdec, &buf)
|
||||
values, err = cur.ReadFloatBlock(&buf)
|
||||
if got, exp := len(values), 1; got != exp {
|
||||
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
// No more blocks for this cursor
|
||||
cur.Next()
|
||||
values, err = cur.ReadFloatBlock(tdec, vdec, &buf)
|
||||
values, err = cur.ReadFloatBlock(&buf)
|
||||
if got, exp := len(values), 0; got != exp {
|
||||
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
|
|
@ -311,8 +311,6 @@ type floatAscendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec FloatDecoder
|
||||
buf []FloatValue
|
||||
values []FloatValue
|
||||
pos int
|
||||
|
@ -330,7 +328,7 @@ func newFloatAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCu
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]FloatValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -412,7 +410,7 @@ func (c *floatAscendingCursor) nextTSM() {
|
|||
c.tsm.pos++
|
||||
if c.tsm.pos >= len(c.tsm.values) {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -427,8 +425,6 @@ type floatDescendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec FloatDecoder
|
||||
buf []FloatValue
|
||||
values []FloatValue
|
||||
pos int
|
||||
|
@ -449,7 +445,7 @@ func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]FloatValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -534,7 +530,7 @@ func (c *floatDescendingCursor) nextTSM() {
|
|||
c.tsm.pos--
|
||||
if c.tsm.pos < 0 {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -758,8 +754,6 @@ type integerAscendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec IntegerDecoder
|
||||
buf []IntegerValue
|
||||
values []IntegerValue
|
||||
pos int
|
||||
|
@ -777,7 +771,7 @@ func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]IntegerValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -859,7 +853,7 @@ func (c *integerAscendingCursor) nextTSM() {
|
|||
c.tsm.pos++
|
||||
if c.tsm.pos >= len(c.tsm.values) {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -874,8 +868,6 @@ type integerDescendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec IntegerDecoder
|
||||
buf []IntegerValue
|
||||
values []IntegerValue
|
||||
pos int
|
||||
|
@ -896,7 +888,7 @@ func newIntegerDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]IntegerValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -981,7 +973,7 @@ func (c *integerDescendingCursor) nextTSM() {
|
|||
c.tsm.pos--
|
||||
if c.tsm.pos < 0 {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -1205,8 +1197,6 @@ type stringAscendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec StringDecoder
|
||||
buf []StringValue
|
||||
values []StringValue
|
||||
pos int
|
||||
|
@ -1224,7 +1214,7 @@ func newStringAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyC
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]StringValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -1306,7 +1296,7 @@ func (c *stringAscendingCursor) nextTSM() {
|
|||
c.tsm.pos++
|
||||
if c.tsm.pos >= len(c.tsm.values) {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -1321,8 +1311,6 @@ type stringDescendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec StringDecoder
|
||||
buf []StringValue
|
||||
values []StringValue
|
||||
pos int
|
||||
|
@ -1343,7 +1331,7 @@ func newStringDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]StringValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -1428,7 +1416,7 @@ func (c *stringDescendingCursor) nextTSM() {
|
|||
c.tsm.pos--
|
||||
if c.tsm.pos < 0 {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -1652,8 +1640,6 @@ type booleanAscendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec BooleanDecoder
|
||||
buf []BooleanValue
|
||||
values []BooleanValue
|
||||
pos int
|
||||
|
@ -1671,7 +1657,7 @@ func newBooleanAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Key
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]BooleanValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -1753,7 +1739,7 @@ func (c *booleanAscendingCursor) nextTSM() {
|
|||
c.tsm.pos++
|
||||
if c.tsm.pos >= len(c.tsm.values) {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -1768,8 +1754,6 @@ type booleanDescendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec BooleanDecoder
|
||||
buf []BooleanValue
|
||||
values []BooleanValue
|
||||
pos int
|
||||
|
@ -1790,7 +1774,7 @@ func newBooleanDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *Ke
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]BooleanValue, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -1875,7 +1859,7 @@ func (c *booleanDescendingCursor) nextTSM() {
|
|||
c.tsm.pos--
|
||||
if c.tsm.pos < 0 {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -307,8 +307,6 @@ type {{.name}}AscendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec {{.Name}}Decoder
|
||||
buf []{{.Name}}Value
|
||||
values []{{.Name}}Value
|
||||
pos int
|
||||
|
@ -326,7 +324,7 @@ func new{{.Name}}AscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *K
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]{{.Name}}Value, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -408,7 +406,7 @@ func (c *{{.name}}AscendingCursor) nextTSM() {
|
|||
c.tsm.pos++
|
||||
if c.tsm.pos >= len(c.tsm.values) {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -423,8 +421,6 @@ type {{.name}}DescendingCursor struct {
|
|||
}
|
||||
|
||||
tsm struct {
|
||||
tdec TimeDecoder
|
||||
vdec {{.Name}}Decoder
|
||||
buf []{{.Name}}Value
|
||||
values []{{.Name}}Value
|
||||
pos int
|
||||
|
@ -445,7 +441,7 @@ func new{{.Name}}DescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *
|
|||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.buf = make([]{{.Name}}Value, 10)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
|
||||
return c.tsm.values[i].UnixNano() >= seek
|
||||
})
|
||||
|
@ -530,7 +526,7 @@ func (c *{{.name}}DescendingCursor) nextTSM() {
|
|||
c.tsm.pos--
|
||||
if c.tsm.pos < 0 {
|
||||
c.tsm.keyCursor.Next()
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.buf)
|
||||
if len(c.tsm.values) == 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -163,10 +163,10 @@ type blockAccessor interface {
|
|||
read(key string, timestamp int64) ([]Value, error)
|
||||
readAll(key string) ([]Value, error)
|
||||
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, fdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error)
|
||||
readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error)
|
||||
readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
|
||||
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
|
||||
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
|
||||
rename(path string) error
|
||||
path() string
|
||||
|
@ -254,30 +254,30 @@ func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
|
|||
return v, err
|
||||
}
|
||||
|
||||
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, vals *[]FloatValue) ([]FloatValue, error) {
|
||||
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readFloatBlock(entry, tdec, vdec, vals)
|
||||
v, err := t.accessor.readFloatBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, vals *[]IntegerValue) ([]IntegerValue, error) {
|
||||
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readIntegerBlock(entry, tdec, vdec, vals)
|
||||
v, err := t.accessor.readIntegerBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, vals *[]StringValue) ([]StringValue, error) {
|
||||
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readStringBlock(entry, tdec, vdec, vals)
|
||||
v, err := t.accessor.readStringBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, vals *[]BooleanValue) ([]BooleanValue, error) {
|
||||
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readBooleanBlock(entry, tdec, vdec, vals)
|
||||
v, err := t.accessor.readBooleanBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
@ -1041,7 +1041,7 @@ func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, er
|
|||
return values, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) {
|
||||
func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
|
@ -1049,7 +1049,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec
|
|||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
|
||||
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
|
@ -1059,7 +1059,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec
|
|||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) {
|
||||
func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
|
@ -1067,7 +1067,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd
|
|||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
|
||||
a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
|
@ -1077,7 +1077,7 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd
|
|||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) {
|
||||
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
|
@ -1085,7 +1085,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde
|
|||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
|
||||
a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
|
@ -1095,7 +1095,7 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde
|
|||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) {
|
||||
func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
|
@ -1103,7 +1103,7 @@ func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vd
|
|||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
|
||||
a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue