Add Keys function to TSMIndex

Useful for testing
pull/4808/head
Jason Wilder 2015-11-13 09:07:37 -07:00
parent d8c0c26934
commit ed7cfb6df3
2 changed files with 62 additions and 7 deletions

View File

@ -114,18 +114,21 @@ type TSMIndex interface {
// Add records a new block entry for a key in the index.
Add(key string, minTime, maxTime time.Time, offset int64, size uint32)
// Entries returns all index entries for a key.
Entries(key string) []*IndexEntry
// Contains returns true if key and time might exists in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exists in this file, but not have point exactly at time t.
Contains(key string, timestamp time.Time) bool
// Entries returns all index entries for a key.
Entries(key string) []*IndexEntry
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key string, timestamp time.Time) *IndexEntry
// Keys returns the unique set of keys in the index.
Keys() []string
// MarshalBinary returns a byte slice encoded version of the index.
MarshalBinary() ([]byte, error)
@ -204,6 +207,15 @@ func (d *directIndex) Contains(key string, t time.Time) bool {
return d.Entry(key, t) != nil
}
func (d *directIndex) Keys() []string {
var keys []string
for k := range d.blocks {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func (d *directIndex) addEntries(key string, entries indexEntries) {
d.blocks[key] = append(d.blocks[key], entries...)
}
@ -427,6 +439,15 @@ func (d *indirectIndex) Entry(key string, timestamp time.Time) *IndexEntry {
return nil
}
func (d *indirectIndex) Keys() []string {
var keys []string
for offset := range d.offsets {
_, key, _ := d.readKey(d.b[:offset])
keys = append(keys, key)
}
return keys
}
func (d *indirectIndex) Contains(key string, timestamp time.Time) bool {
return d.Entry(key, timestamp) != nil
}
@ -622,6 +643,10 @@ func (t *tsmReader) init() error {
return nil
}
func (t *tsmReader) Keys() []string {
return t.index.Keys()
}
func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) {
block := t.index.Entry(key, timestamp)
if block == nil {
@ -666,20 +691,26 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) {
// TODO: we can determine the max block size when loading the file create/re-use
// a reader level buf then.
b := make([]byte, 16*1024)
var pos int64
for _, block := range blocks {
_, err := t.r.Seek(block.Offset, os.SEEK_SET)
if err != nil {
return nil, err
// Skip the seek call if we are already at the position we're seeking to
if pos != block.Offset {
_, err := t.r.Seek(block.Offset, os.SEEK_SET)
if err != nil {
return nil, err
}
pos = block.Offset
}
if int(block.Size) > len(b) {
b = make([]byte, block.Size)
}
n, err := t.r.Read(b)
n, err := t.r.Read(b[:block.Size])
if err != nil {
return nil, err
}
pos += int64(block.Size)
//TODO: Validate checksum
temp = temp[:0]

View File

@ -437,3 +437,27 @@ func TestIndirectIndex_MaxBlocks(t *testing.T) {
println(err.Error())
}
}
func TestIndirectIndex_Keys(t *testing.T) {
index := tsm1.NewDirectIndex()
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("mem", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
index.Add("cpu", time.Unix(1, 0), time.Unix(2, 0), 20, 30)
keys := index.Keys()
// 2 distinct keys
if got, exp := len(keys), 2; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
// Keys should be sorted
if got, exp := keys[0], "cpu"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := keys[1], "mem"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
}