Merge pull request #4825 from influxdb/jw-tsm2

Support typed value decoding
pull/4840/head
Jason Wilder 2015-11-17 23:30:05 -07:00
commit d1b6e21e25
8 changed files with 702 additions and 158 deletions

View File

@ -354,7 +354,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
encoded := buf[9:]
var v []tsm1.Value
err := tsm1.DecodeBlock(buf, &v)
v, err := tsm1.DecodeBlock(buf, v)
if err != nil {
fmt.Printf("error: %v\n", err.Error())
os.Exit(1)

View File

@ -39,12 +39,12 @@ The index structure can provide efficient access to all blocks as well as the ab
_TBD: The block length stored in the block data could probably be dropped since we store it in the index._
```
┌──────────────────────────────────────────────────────────────────────────┐
│ Index │
├─────────┬─────────┬───────────────┬─────────┬─────────┬─────────┬──────┤
│ Key Len │ Key │ Count │Min Time │Max Time │ Offset │ Size ...
│ 2 bytes │ N bytes │2 bytes│ 8 bytes │ 8 bytes │ 8 bytes │ 4 bytes │
└─────────┴─────────┴───────────────┴─────────┴─────────┴─────────┴──────┘
┌────────────────────────────────────────────────────────────────────────────
Index
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
```
The last section is the footer that stores the offset of the start of the index.

View File

@ -474,7 +474,7 @@ func (c *cursor) decodeBlock(position uint32) {
length := c.blockLength(position)
block := c.f.mmap[position+blockHeaderSize : position+blockHeaderSize+length]
c.vals = c.vals[:0]
_ = DecodeBlock(block, &c.vals)
c.vals, _ = DecodeBlock(block, c.vals)
// only adavance the position if we're asceending.
// Descending queries use the blockPositions

View File

@ -45,12 +45,12 @@ timestamp as well as where that block resides and how much data to read to
retrieve the block. If we know we need to read all or multiple blocks in a
file, we can use the size to determine how much to read in a given IO.
Index
Key Len Key Count Min Time Max Time Offset Size ...
2 bytes N bytes 2 bytes 8 bytes 8 bytes 8 bytes 4 bytes
Index
Key Len Key Type Count Min Time Max Time Offset Size ...
2 bytes N bytes 1 byte2 bytes 8 bytes 8 bytes 8 bytes 4 bytes
The last section is the footer that stores the offset of the start of the index.
@ -86,6 +86,9 @@ const (
// Size in bytes used to store the count of index entries for a key
indexCountSize = 2
// Size in bytes used to store the type of block encoded
indexTypeSize = 1
// Max number of blocks for a given key that can exist in a single file
maxIndexEntries = (1 << (indexCountSize * 8)) - 1
)
@ -113,7 +116,7 @@ type TSMWriter interface {
type TSMIndex interface {
// Add records a new block entry for a key in the index.
Add(key string, minTime, maxTime time.Time, offset int64, size uint32)
Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32)
// Delete removes the given key from the index.
Delete(key string)
@ -136,6 +139,11 @@ type TSMIndex interface {
// Keys returns the unique set of keys in the index.
Keys() []string
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
Type(key string) (byte, error)
// MarshalBinary returns a byte slice encoded version of the index.
MarshalBinary() ([]byte, error)
@ -177,7 +185,7 @@ func (e *IndexEntry) Contains(t time.Time) bool {
func NewDirectIndex() TSMIndex {
return &directIndex{
blocks: map[string]indexEntries{},
blocks: map[string]*indexEntries{},
}
}
@ -186,14 +194,21 @@ func NewDirectIndex() TSMIndex {
type directIndex struct {
mu sync.RWMutex
blocks map[string]indexEntries
blocks map[string]*indexEntries
}
func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) {
func (d *directIndex) Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) {
d.mu.Lock()
defer d.mu.Unlock()
d.blocks[key] = append(d.blocks[key], &IndexEntry{
entries := d.blocks[key]
if entries == nil {
entries = &indexEntries{
Type: blockType,
}
d.blocks[key] = entries
}
entries.Append(&IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
@ -205,7 +220,11 @@ func (d *directIndex) Entries(key string) []*IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
return d.blocks[key]
entries := d.blocks[key]
if entries == nil {
return nil
}
return d.blocks[key].entries
}
func (d *directIndex) Entry(key string, t time.Time) *IndexEntry {
@ -221,6 +240,16 @@ func (d *directIndex) Entry(key string, t time.Time) *IndexEntry {
return nil
}
func (d *directIndex) Type(key string) (byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
entries := d.blocks[key]
if entries != nil {
return entries.Type, nil
}
return 0, fmt.Errorf("key does not exist: %v", key)
}
func (d *directIndex) Contains(key string) bool {
return len(d.Entries(key)) > 0
}
@ -248,8 +277,13 @@ func (d *directIndex) Keys() []string {
return keys
}
func (d *directIndex) addEntries(key string, entries indexEntries) {
d.blocks[key] = append(d.blocks[key], entries...)
func (d *directIndex) addEntries(key string, entries *indexEntries) {
existing := d.blocks[key]
if existing == nil {
d.blocks[key] = entries
return
}
existing.Append(entries.entries...)
}
func (d *directIndex) Write(w io.Writer) error {
@ -284,9 +318,9 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
for _, key := range keys {
entries := d.blocks[key]
if len(entries) > maxIndexEntries {
if entries.Len() > maxIndexEntries {
return nil, fmt.Errorf("key '%s' exceeds max index entries: %d > %d",
key, len(entries), maxIndexEntries)
key, entries.Len(), maxIndexEntries)
}
sort.Sort(entries)
@ -294,16 +328,18 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
b = append(b, u16tob(uint16(len(key)))...)
b = append(b, key...)
// Append the block type
b = append(b, entries.Type)
// Append the index block count
b = append(b, u16tob(uint16(len(entries)))...)
b = append(b, u16tob(uint16(entries.Len()))...)
// Append each index entry for all blocks for this key
for _, entry := range entries {
b = append(b, u64tob(uint64(entry.MinTime.UnixNano()))...)
b = append(b, u64tob(uint64(entry.MaxTime.UnixNano()))...)
b = append(b, u64tob(uint64(entry.Offset))...)
b = append(b, u32tob(entry.Size)...)
eb, err := entries.MarshalBinary()
if err != nil {
return nil, err
}
b = append(b, eb...)
}
return b, nil
}
@ -314,13 +350,13 @@ func (d *directIndex) UnmarshalBinary(b []byte) error {
var pos int
for pos < len(b) {
n, key, err := d.readKey(b[pos:])
n, key, err := readKey(b[pos:])
if err != nil {
return fmt.Errorf("readIndex: read key error: %v", err)
}
pos += n
n, entries, err := d.readEntries(b[pos:])
n, entries, err := readEntries(b[pos:])
if err != nil {
return fmt.Errorf("readIndex: read entries error: %v", err)
}
@ -331,31 +367,6 @@ func (d *directIndex) UnmarshalBinary(b []byte) error {
return nil
}
func (d *directIndex) readKey(b []byte) (n int, key string, err error) {
// 2 byte size of key
n, size := 2, int(btou16(b[:2]))
// N byte key
key = string(b[n : n+size])
n += len(key)
return
}
func (d *directIndex) readEntries(b []byte) (n int, entries indexEntries, err error) {
// 2 byte count of index entries
n, count := indexCountSize, int(btou16(b[:2]))
for i := 0; i < count; i++ {
ie := &IndexEntry{}
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries = append(entries, ie)
n += indexEntrySize
}
return
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
@ -413,12 +424,13 @@ func NewIndirectIndex() TSMIndex {
}
// Add records a new block entry for a key in the index.
func (d *indirectIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) {
func (d *indirectIndex) Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) {
panic("unsupported operation")
}
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key string) []*IndexEntry {
// search returns the index of i in offsets for where key is located. If key is not
// in the index, len(offsets) is returned.
func (d *indirectIndex) search(key string) int {
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice).
i := sort.Search(len(d.offsets), func(i int) bool {
@ -438,7 +450,31 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry {
// See if we might have found the right index
if i < len(d.offsets) {
ofs := d.offsets[i]
n, k, err := d.readKey(d.b[ofs:])
_, k, err := readKey(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading key: %v", err))
}
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// matches the search value.
if k != key {
return len(d.offsets)
}
return int(ofs)
}
// The key is not in the index. i is the index where it would be inserted so return
// a value outside our offset range.
return len(d.offsets)
}
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key string) []*IndexEntry {
ofs := d.search(key)
if ofs < len(d.offsets) {
n, k, err := readKey(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading key: %v", err))
}
@ -451,14 +487,13 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry {
}
// Read and return all the entries
ofs += int32(n)
_, entries, err := d.readEntries(d.b[ofs:])
ofs += n
_, entries, err := readEntries(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading entries: %v", err))
}
return entries
return entries.entries
}
// The key is not in the index. i is the index where it would be inserted.
@ -480,7 +515,7 @@ func (d *indirectIndex) Entry(key string, timestamp time.Time) *IndexEntry {
func (d *indirectIndex) Keys() []string {
var keys []string
for offset := range d.offsets {
_, key, _ := d.readKey(d.b[offset:])
_, key, _ := readKey(d.b[offset:])
keys = append(keys, key)
}
return keys
@ -489,7 +524,7 @@ func (d *indirectIndex) Keys() []string {
func (d *indirectIndex) Delete(key string) {
var offsets []int32
for offset := range d.offsets {
_, indexKey, _ := d.readKey(d.b[offset:])
_, indexKey, _ := readKey(d.b[offset:])
if key == indexKey {
continue
}
@ -506,6 +541,20 @@ func (d *indirectIndex) ContainsValue(key string, timestamp time.Time) bool {
return d.Entry(key, timestamp) != nil
}
func (d *indirectIndex) Type(key string) (byte, error) {
ofs := d.search(key)
if ofs < len(d.offsets) {
n, _, err := readKey(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading key: %v", err))
}
ofs += n
return d.b[ofs], nil
}
return 0, fmt.Errorf("key does not exist: %v", key)
}
// MarshalBinary returns a byte slice encoded version of the index.
func (d *indirectIndex) MarshalBinary() ([]byte, error) {
return d.b, nil
@ -532,6 +581,9 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
// Skip over the key
i += keyLen
// Skip over the block type
i += indexTypeSize
// Count of all the index blocks for this key
count := int32(btou16(b[i : i+2]))
@ -545,31 +597,6 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
return nil
}
func (d *indirectIndex) readKey(b []byte) (n int, key string, err error) {
// 2 byte size of key
n, size := 2, int(btou16(b[:2]))
// N byte key
key = string(b[n : n+size])
n += len(key)
return
}
func (d *indirectIndex) readEntries(b []byte) (n int, entries indexEntries, err error) {
// 2 byte count of index entries
n, count := indexCountSize, int(btou16(b[:2]))
for i := 0; i < count; i++ {
ie := &IndexEntry{}
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries = append(entries, ie)
n += indexEntrySize
}
return
}
// tsmWriter writes keys and values in the TSM format
type tsmWriter struct {
w io.Writer
@ -584,7 +611,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) {
}
index := &directIndex{
blocks: map[string]indexEntries{},
blocks: map[string]*indexEntries{},
}
return &tsmWriter{w: w, index: index, n: int64(n)}, nil
@ -603,8 +630,12 @@ func (t *tsmWriter) Write(key string, values Values) error {
return err
}
blockType, err := BlockType(block)
if err != nil {
return err
}
// Record this block in index
t.index.Add(key, values[0].Time(), values[len(values)-1].Time(), t.n, uint32(n))
t.index.Add(key, blockType, values[0].Time(), values[len(values)-1].Time(), t.n, uint32(n))
// Increment file position pointer
t.n += int64(n)
@ -689,7 +720,7 @@ func (t *tsmReader) init() error {
b = make([]byte, t.indexEnd-t.indexStart)
t.index = &directIndex{
blocks: map[string]indexEntries{},
blocks: map[string]*indexEntries{},
}
_, err = t.r.Read(b)
if err != nil {
@ -758,7 +789,7 @@ func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) {
//TODO: Validate checksum
var values []Value
err = DecodeBlock(b[4:n], &values)
values, err = DecodeBlock(b[4:n], values)
if err != nil {
return nil, err
}
@ -804,7 +835,7 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) {
//TODO: Validate checksum
temp = temp[:0]
err = DecodeBlock(b[4:n], &temp)
temp, err = DecodeBlock(b[4:n], temp)
if err != nil {
return nil, err
}
@ -814,6 +845,10 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) {
return values, nil
}
func (t *tsmReader) Type(key string) (byte, error) {
return t.index.Type(key)
}
func (t *tsmReader) Close() error {
t.mu.Lock()
defer t.mu.Unlock()
@ -844,11 +879,64 @@ func (t *tsmReader) Delete(key string) error {
return nil
}
type indexEntries []*IndexEntry
type indexEntries struct {
Type byte
entries []*IndexEntry
}
func (a indexEntries) Len() int { return len(a) }
func (a indexEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a indexEntries) Less(i, j int) bool { return a[i].MinTime.UnixNano() < a[j].MinTime.UnixNano() }
func (a *indexEntries) Len() int { return len(a.entries) }
func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
func (a *indexEntries) Less(i, j int) bool {
return a.entries[i].MinTime.UnixNano() < a.entries[j].MinTime.UnixNano()
}
func (a *indexEntries) Append(entry ...*IndexEntry) {
a.entries = append(a.entries, entry...)
}
func (a *indexEntries) MarshalBinary() (b []byte, err error) {
for _, entry := range a.entries {
b = append(b, u64tob(uint64(entry.MinTime.UnixNano()))...)
b = append(b, u64tob(uint64(entry.MaxTime.UnixNano()))...)
b = append(b, u64tob(uint64(entry.Offset))...)
b = append(b, u32tob(entry.Size)...)
}
return b, nil
}
func readKey(b []byte) (n int, key string, err error) {
// 2 byte size of key
n, size := 2, int(btou16(b[:2]))
// N byte key
key = string(b[n : n+size])
n += len(key)
return
}
func readEntries(b []byte) (n int, entries *indexEntries, err error) {
// 1 byte block type
blockType := b[n]
entries = &indexEntries{
Type: blockType,
entries: []*IndexEntry{},
}
n++
// 2 byte count of index entries
count := int(btou16(b[n : n+indexCountSize]))
n += indexCountSize
for i := 0; i < count; i++ {
ie := &IndexEntry{}
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize+indexTypeSize : i*indexEntrySize+indexCountSize+indexEntrySize+indexTypeSize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries.Append(ie)
n += indexEntrySize
}
return
}
func u16tob(v uint16) []byte {
b := make([]byte, 2)

View File

@ -360,9 +360,9 @@ func TestTSMWriter_Read_Multiple(t *testing.T) {
func TestIndirectIndex_Entries(t *testing.T) {
index := tsm1.NewDirectIndex()
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 100)
index.Add("cpu", time.Unix(2, 0), time.Unix(3, 0), 20, 200)
index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 100)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(2, 0), time.Unix(3, 0), 20, 200)
index.Add("mem", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100)
b, err := index.MarshalBinary()
if err != nil {
@ -402,8 +402,8 @@ func TestIndirectIndex_Entries(t *testing.T) {
func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
index := tsm1.NewDirectIndex()
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 100)
index.Add("cpu", time.Unix(2, 0), time.Unix(3, 0), 20, 200)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 100)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(2, 0), time.Unix(3, 0), 20, 200)
b, err := index.MarshalBinary()
if err != nil {
@ -428,7 +428,7 @@ func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
func TestIndirectIndex_MaxBlocks(t *testing.T) {
index := tsm1.NewDirectIndex()
for i := 0; i < 1<<16; i++ {
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20)
}
if _, err := index.MarshalBinary(); err == nil {
@ -438,11 +438,32 @@ func TestIndirectIndex_MaxBlocks(t *testing.T) {
}
}
func TestIndirectIndex_Type(t *testing.T) {
index := tsm1.NewDirectIndex()
index.Add("cpu", tsm1.BlockInt64, time.Unix(0, 0), time.Unix(1, 0), 10, 20)
b, err := index.MarshalBinary()
ind := tsm1.NewIndirectIndex()
if err := ind.UnmarshalBinary(b); err != nil {
fatal(t, "unmarshal binary", err)
}
typ, err := ind.Type("cpu")
if err != nil {
fatal(t, "reading type", err)
}
if got, exp := typ, tsm1.BlockInt64; got != exp {
t.Fatalf("type mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_Keys(t *testing.T) {
index := tsm1.NewDirectIndex()
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("cpu", time.Unix(1, 0), time.Unix(2, 0), 20, 30)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("mem", tsm1.BlockFloat64, time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("cpu", tsm1.BlockFloat64, time.Unix(1, 0), time.Unix(2, 0), 20, 30)
keys := index.Keys()
@ -459,5 +480,35 @@ func TestIndirectIndex_Keys(t *testing.T) {
if got, exp := keys[1], "mem"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
}
func TestTSMWriter_Type(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), int64(1))}
if err := w.Write("cpu", values); err != nil {
t.Fatalf("unexpeted error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpeted error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
typ, err := r.Type("cpu")
if err != nil {
fatal(t, "reading type", err)
}
if got, exp := typ, tsm1.BlockInt64; got != exp {
t.Fatalf("type mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -11,16 +11,16 @@ import (
const (
// BlockFloat64 designates a block encodes float64 values
BlockFloat64 = 0
BlockFloat64 = byte(0)
// BlockInt64 designates a block encodes int64 values
BlockInt64 = 1
BlockInt64 = byte(1)
// BlockBool designates a block encodes bool values
BlockBool = 2
BlockBool = byte(2)
// BlockString designates a block encodes string values
BlockString = 3
BlockString = byte(3)
// encodedBlockHeaderSize is the size of the header for an encoded block. The first 8 bytes
// are the minimum timestamp of the block. The next byte is a block encoding type indicator.
@ -98,23 +98,69 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
return nil, fmt.Errorf("unsupported value type %T", a[0])
}
// BlockType returns the type of value encoded in a block or an error
// if the block type is unknown.
func BlockType(block []byte) (byte, error) {
blockType := block[8]
switch blockType {
case BlockFloat64, BlockInt64, BlockBool, BlockString:
return blockType, nil
default:
return 0, fmt.Errorf("unknown block type: %d", blockType)
}
}
// DecodeBlock takes a byte array and will decode into values of the appropriate type
// based on the block
func DecodeBlock(block []byte, vals *[]Value) error {
// based on the block.
func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
if len(block) <= encodedBlockHeaderSize {
panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
}
blockType := block[8]
blockType, err := BlockType(block)
if err != nil {
return nil, err
}
switch blockType {
case BlockFloat64:
return decodeFloatBlock(block, vals)
decoded, err := DecodeFloatBlock(block, nil)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals, err
case BlockInt64:
return decodeInt64Block(block, vals)
decoded, err := DecodeInt64Block(block, nil)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals, err
case BlockBool:
return decodeBoolBlock(block, vals)
decoded, err := DecodeBoolBlock(block, nil)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals, err
case BlockString:
return decodeStringBlock(block, vals)
decoded, err := DecodeStringBlock(block, nil)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals, err
default:
panic(fmt.Sprintf("unknown block type: %d", blockType))
}
@ -203,14 +249,14 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
return block, nil
}
func decodeFloatBlock(block []byte, a *[]Value) error {
func DecodeFloatBlock(block []byte, a []*FloatValue) ([]*FloatValue, error) {
// The first 8 bytes is the minimum timestamp of the block
block = block[8:]
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
if blockType != BlockFloat64 {
return fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
}
block = block[1:]
@ -220,26 +266,33 @@ func decodeFloatBlock(block []byte, a *[]Value) error {
dec := NewTimeDecoder(tb)
iter, err := NewFloatDecoder(vb)
if err != nil {
return err
return nil, err
}
// Decode both a timestamp and value
i := 0
for dec.Next() && iter.Next() {
ts := dec.Read()
v := iter.Values()
*a = append(*a, &FloatValue{ts, v})
if i < len(a) && a[i] != nil {
a[i].time = ts
a[i].value = v
} else {
a = append(a, &FloatValue{ts, v})
}
i++
}
// Did timestamp decoding have an error?
if dec.Error() != nil {
return dec.Error()
return nil, dec.Error()
}
// Did float decoding have an error?
if iter.Error() != nil {
return iter.Error()
return nil, iter.Error()
}
return nil
return a[:i], nil
}
type BoolValue struct {
@ -300,14 +353,14 @@ func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) {
return block, nil
}
func decodeBoolBlock(block []byte, a *[]Value) error {
func DecodeBoolBlock(block []byte, a []*BoolValue) ([]*BoolValue, error) {
// The first 8 bytes is the minimum timestamp of the block
block = block[8:]
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
if blockType != BlockBool {
return fmt.Errorf("invalid block type: exp %d, got %d", BlockBool, blockType)
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockBool, blockType)
}
block = block[1:]
@ -318,22 +371,29 @@ func decodeBoolBlock(block []byte, a *[]Value) error {
vdec := NewBoolDecoder(vb)
// Decode both a timestamp and value
i := 0
for dec.Next() && vdec.Next() {
ts := dec.Read()
v := vdec.Read()
*a = append(*a, &BoolValue{ts, v})
if i < len(a) && a[i] != nil {
a[i].time = ts
a[i].value = v
} else {
a = append(a, &BoolValue{ts, v})
}
i++
}
// Did timestamp decoding have an error?
if dec.Error() != nil {
return dec.Error()
return nil, dec.Error()
}
// Did bool decoding have an error?
if vdec.Error() != nil {
return vdec.Error()
return nil, vdec.Error()
}
return nil
return a[:i], nil
}
type Int64Value struct {
@ -383,13 +443,13 @@ func encodeInt64Block(buf []byte, values []Value) ([]byte, error) {
return append(block, packBlock(tb, vb)...), nil
}
func decodeInt64Block(block []byte, a *[]Value) error {
func DecodeInt64Block(block []byte, a []*Int64Value) ([]*Int64Value, error) {
// slice off the first 8 bytes (min timestmap for the block)
block = block[8:]
blockType := block[0]
if blockType != BlockInt64 {
return fmt.Errorf("invalid block type: exp %d, got %d", BlockInt64, blockType)
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockInt64, blockType)
}
block = block[1:]
@ -402,22 +462,29 @@ func decodeInt64Block(block []byte, a *[]Value) error {
vDec := NewInt64Decoder(vb)
// Decode both a timestamp and value
i := 0
for tsDec.Next() && vDec.Next() {
ts := tsDec.Read()
v := vDec.Read()
*a = append(*a, &Int64Value{ts, v})
if i < len(a) && a[i] != nil {
a[i].time = ts
a[i].value = v
} else {
a = append(a, &Int64Value{ts, v})
}
i++
}
// Did timestamp decoding have an error?
if tsDec.Error() != nil {
return tsDec.Error()
return nil, tsDec.Error()
}
// Did int64 decoding have an error?
if vDec.Error() != nil {
return vDec.Error()
return nil, vDec.Error()
}
return nil
return a[:i], nil
}
type StringValue struct {
@ -467,13 +534,13 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
return append(block, packBlock(tb, vb)...), nil
}
func decodeStringBlock(block []byte, a *[]Value) error {
func DecodeStringBlock(block []byte, a []*StringValue) ([]*StringValue, error) {
// slice off the first 8 bytes (min timestmap for the block)
block = block[8:]
blockType := block[0]
if blockType != BlockString {
return fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType)
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockString, blockType)
}
block = block[1:]
@ -485,26 +552,33 @@ func decodeStringBlock(block []byte, a *[]Value) error {
tsDec := NewTimeDecoder(tb)
vDec, err := NewStringDecoder(vb)
if err != nil {
return err
return nil, err
}
// Decode both a timestamp and value
i := 0
for tsDec.Next() && vDec.Next() {
ts := tsDec.Read()
v := vDec.Read()
*a = append(*a, &StringValue{ts, v})
if i < len(a) && a[i] != nil {
a[i].time = ts
a[i].value = v
} else {
a = append(a, &StringValue{ts, v})
}
i++
}
// Did timestamp decoding have an error?
if tsDec.Error() != nil {
return tsDec.Error()
return nil, tsDec.Error()
}
// Did string decoding have an error?
if vDec.Error() != nil {
return vDec.Error()
return nil, vDec.Error()
}
return nil
return a[:i], nil
}
func packBlockHeader(firstTime time.Time, blockType byte) []byte {

View File

@ -24,7 +24,8 @@ func TestEncoding_FloatBlock(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -45,7 +46,8 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -68,7 +70,8 @@ func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -91,7 +94,8 @@ func TestEncoding_IntBlock_Basic(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -129,7 +133,8 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -156,7 +161,8 @@ func TestEncoding_BoolBlock_Basic(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -179,7 +185,8 @@ func TestEncoding_StringBlock_Basic(t *testing.T) {
}
var decodedValues []tsm1.Value
if err := tsm1.DecodeBlock(b, &decodedValues); err != nil {
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
@ -188,6 +195,42 @@ func TestEncoding_StringBlock_Basic(t *testing.T) {
}
}
func TestEncoding_BlockType(t *testing.T) {
tests := []struct {
value interface{}
blockType byte
}{
{value: float64(1.0), blockType: tsm1.BlockFloat64},
{value: int64(1), blockType: tsm1.BlockInt64},
{value: true, blockType: tsm1.BlockBool},
{value: "string", blockType: tsm1.BlockString},
}
for _, test := range tests {
var values []tsm1.Value
values = append(values, tsm1.NewValue(time.Unix(0, 0), test.value))
b, err := tsm1.Values(values).Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
bt, err := tsm1.BlockType(b)
if err != nil {
t.Fatalf("unexpected error decoding block type: %v", err)
}
if got, exp := bt, test.blockType; got != exp {
t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
}
}
_, err := tsm1.BlockType([]byte{0, 0, 0, 0, 0, 0, 0, 0, 10})
if err == nil {
t.Fatalf("expected error decoding block type, got nil")
}
}
func getTimes(n, step int, precision time.Duration) []time.Time {
t := time.Now().Round(precision)
a := make([]time.Time, n)
@ -196,3 +239,291 @@ func getTimes(n, step int, precision time.Duration) []time.Time {
}
return a
}
func BenchmarkDecodeBlock_Float_Empty(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, float64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Float_EqualSize(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, float64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]tsm1.Value, len(values))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Float_TypeSpecific(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, float64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]*tsm1.FloatValue, len(values))
for i := 0; i < len(decodedValues); i++ {
decodedValues[i] = &tsm1.FloatValue{}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeFloatBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Int64_Empty(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, int64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Int64_EqualSize(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, int64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]tsm1.Value, len(values))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Int64_TypeSpecific(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, int64(i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]*tsm1.Int64Value, len(values))
for i := 0; i < len(decodedValues); i++ {
decodedValues[i] = &tsm1.Int64Value{}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeInt64Block(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Bool_Empty(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, true)
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Bool_EqualSize(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, true)
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]tsm1.Value, len(values))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_Bool_TypeSpecific(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, true)
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]*tsm1.BoolValue, len(values))
for i := 0; i < len(decodedValues); i++ {
decodedValues[i] = &tsm1.BoolValue{}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBoolBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_String_Empty(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_String_EqualSize(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]tsm1.Value, len(values))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}
func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
}
bytes, err := tsm1.Values(values).Encode(nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
decodedValues := make([]*tsm1.StringValue, len(values))
for i := 0; i < len(decodedValues); i++ {
decodedValues[i] = &tsm1.StringValue{}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = tsm1.DecodeStringBlock(bytes, decodedValues)
if err != nil {
b.Fatalf("unexpected error decoding block: %v", err)
}
}
}

View File

@ -1642,7 +1642,7 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime
}
var values []Value
err := DecodeBlock(block, &values)
values, err := DecodeBlock(block, values)
if err != nil {
panic(fmt.Sprintf("failure decoding block: %v", err))
}
@ -1871,7 +1871,7 @@ func (c *compactionJob) writeIDFromFile(id uint64, previousValues Values, filePo
// decode the block and append to previous values
// TODO: update this so that blocks already at their limit don't need decoding
var values []Value
err := DecodeBlock(block, &values)
values, err := DecodeBlock(block, values)
if err != nil {
panic(fmt.Sprintf("error decoding block: %s", err.Error()))
}