From ed7cfb6df36a4d448c9d0420d7a23eb07cf552a8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 13 Nov 2015 09:07:37 -0700 Subject: [PATCH] Add Keys function to TSMIndex Useful for testing --- tsdb/engine/tsm1/data_file.go | 45 +++++++++++++++++++++++++----- tsdb/engine/tsm1/data_file_test.go | 24 ++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index d67481c084..a1af8226a5 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -114,18 +114,21 @@ type TSMIndex interface { // Add records a new block entry for a key in the index. Add(key string, minTime, maxTime time.Time, offset int64, size uint32) - // Entries returns all index entries for a key. - Entries(key string) []*IndexEntry - // Contains returns true if key and time might exists in this file. This function could // return true even though the actual point does not exists. For example, the key may // exists in this file, but not have point exactly at time t. Contains(key string, timestamp time.Time) bool + // Entries returns all index entries for a key. + Entries(key string) []*IndexEntry + // Entry returns the index entry for the specified key and timestamp. If no entry // matches the key and timestamp, nil is returned. Entry(key string, timestamp time.Time) *IndexEntry + // Keys returns the unique set of keys in the index. + Keys() []string + // MarshalBinary returns a byte slice encoded version of the index. MarshalBinary() ([]byte, error) @@ -204,6 +207,15 @@ func (d *directIndex) Contains(key string, t time.Time) bool { return d.Entry(key, t) != nil } +func (d *directIndex) Keys() []string { + var keys []string + for k := range d.blocks { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + func (d *directIndex) addEntries(key string, entries indexEntries) { d.blocks[key] = append(d.blocks[key], entries...) } @@ -427,6 +439,15 @@ func (d *indirectIndex) Entry(key string, timestamp time.Time) *IndexEntry { return nil } +func (d *indirectIndex) Keys() []string { + var keys []string + for offset := range d.offsets { + _, key, _ := d.readKey(d.b[:offset]) + keys = append(keys, key) + } + return keys +} + func (d *indirectIndex) Contains(key string, timestamp time.Time) bool { return d.Entry(key, timestamp) != nil } @@ -622,6 +643,10 @@ func (t *tsmReader) init() error { return nil } +func (t *tsmReader) Keys() []string { + return t.index.Keys() +} + func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) { block := t.index.Entry(key, timestamp) if block == nil { @@ -666,20 +691,26 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) { // TODO: we can determine the max block size when loading the file create/re-use // a reader level buf then. b := make([]byte, 16*1024) + var pos int64 for _, block := range blocks { - _, err := t.r.Seek(block.Offset, os.SEEK_SET) - if err != nil { - return nil, err + // Skip the seek call if we are already at the position we're seeking to + if pos != block.Offset { + _, err := t.r.Seek(block.Offset, os.SEEK_SET) + if err != nil { + return nil, err + } + pos = block.Offset } if int(block.Size) > len(b) { b = make([]byte, block.Size) } - n, err := t.r.Read(b) + n, err := t.r.Read(b[:block.Size]) if err != nil { return nil, err } + pos += int64(block.Size) //TODO: Validate checksum temp = temp[:0] diff --git a/tsdb/engine/tsm1/data_file_test.go b/tsdb/engine/tsm1/data_file_test.go index b78a9d3a25..4987b047ba 100644 --- a/tsdb/engine/tsm1/data_file_test.go +++ b/tsdb/engine/tsm1/data_file_test.go @@ -437,3 +437,27 @@ func TestIndirectIndex_MaxBlocks(t *testing.T) { println(err.Error()) } } + +func TestIndirectIndex_Keys(t *testing.T) { + index := tsm1.NewDirectIndex() + index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20) + index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 20) + index.Add("cpu", time.Unix(1, 0), time.Unix(2, 0), 20, 30) + + keys := index.Keys() + + // 2 distinct keys + if got, exp := len(keys), 2; got != exp { + t.Fatalf("length mismatch: got %v, exp %v", got, exp) + } + + // Keys should be sorted + if got, exp := keys[0], "cpu"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := keys[1], "mem"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + +}