diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 4b8e658274..6b60b1827c 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -303,49 +303,16 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e return nil } - readers := make([]chan seriesKey, 0, len(f.files)) - done := make(chan struct{}) - for _, f := range f.files { - ch := make(chan seriesKey, 1) - readers = append(readers, ch) - - go func(c chan seriesKey, r TSMFile) { - - start := 0 - if len(seek) > 0 { - start = r.Seek(seek) - } - n := r.KeyCount() - for i := start; i < n; i++ { - - key, typ := r.KeyAt(i) - select { - case <-done: - // Abort iteration - break - case c <- seriesKey{key, typ}: - } - - } - close(ch) - }(ch, f) - } + ki := newMergeKeyIterator(f.files, seek) f.mu.RUnlock() - - merged := merge(readers...) - var err error - for v := range merged { - // Drain the remaing values so goroutines can exit - if err != nil { - continue - } - if err = fn(v.key, v.typ); err != nil { - // Signal that we should stop iterating - close(done) + for ki.Next() { + key, typ := ki.Read() + if err := fn(key, typ); err != nil { + return err } } - return err + return nil } // Keys returns all keys and types for all files in the file store. diff --git a/tsdb/engine/tsm1/file_store_key_iterator.go b/tsdb/engine/tsm1/file_store_key_iterator.go new file mode 100644 index 0000000000..df2a51b419 --- /dev/null +++ b/tsdb/engine/tsm1/file_store_key_iterator.go @@ -0,0 +1,111 @@ +package tsm1 + +import ( + "bytes" + "container/heap" +) + +type keyIterator struct { + f TSMFile + p, n int + key []byte + typ byte +} + +func newKeyIterator(f TSMFile, seek []byte) *keyIterator { + p, n := 0, f.KeyCount() + if len(seek) > 0 { + p = f.Seek(seek) + } + + if p >= n { + return nil + } + + k := &keyIterator{f: f, p: p, n: n} + k.next() + + return k +} + +func (k *keyIterator) next() bool { + if k.p < k.n { + k.key, k.typ = k.f.KeyAt(k.p) + k.p++ + return true + } + return false +} + +type mergeKeyIterator struct { + itrs keyIterators + key []byte + typ byte +} + +func newMergeKeyIterator(files []TSMFile, seek []byte) *mergeKeyIterator { + m := &mergeKeyIterator{} + itrs := make(keyIterators, 0, len(files)) + for _, f := range files { + if ki := newKeyIterator(f, seek); ki != nil { + itrs = append(itrs, ki) + } + } + m.itrs = itrs + heap.Init(&m.itrs) + + return m +} + +func (m *mergeKeyIterator) Next() bool { + merging := len(m.itrs) > 1 + +RETRY: + if len(m.itrs) == 0 { + return false + } + + key, typ := m.itrs[0].key, m.itrs[0].typ + more := m.itrs[0].next() + + switch { + case len(m.itrs) > 1: + if !more { + // remove iterator from heap + heap.Pop(&m.itrs) + } else { + heap.Fix(&m.itrs, 0) + } + + case len(m.itrs) == 1: + if !more { + m.itrs = nil + } + } + + if merging && bytes.Compare(m.key, key) == 0 { + // same as previous key, keep iterating + goto RETRY + } + + m.key, m.typ = key, typ + + return true +} + +func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ } + +type keyIterators []*keyIterator + +func (k keyIterators) Len() int { return len(k) } +func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 } +func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] } +func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) } + +func (k *keyIterators) Pop() interface{} { + old := *k + n := len(old) + x := old[n-1] + *k = old[:n-1] + return x +} diff --git a/tsdb/engine/tsm1/file_store_key_iterator_test.go b/tsdb/engine/tsm1/file_store_key_iterator_test.go new file mode 100644 index 0000000000..e4a099211a --- /dev/null +++ b/tsdb/engine/tsm1/file_store_key_iterator_test.go @@ -0,0 +1,198 @@ +package tsm1 + +import ( + "sort" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestNewMergeKeyIterator(t *testing.T) { + cases := []struct { + name string + seek string + files []TSMFile + + exp []string + }{ + { + name: "mixed", + files: newTSMFiles( + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "cccc", "dddd"}, + []string{"eeee", "ffff", "gggg"}, + []string{"aaaa"}, + []string{"dddd"}, + ), + exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff", "gggg"}, + }, + + { + name: "similar keys", + files: newTSMFiles( + []string{"a", "aaa"}, + []string{"aa", "aaaa"}, + ), + exp: []string{"a", "aa", "aaa", "aaaa"}, + }, + + { + name: "seek skips some files", + seek: "eeee", + files: newTSMFiles( + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "cccc", "dddd"}, + []string{"eeee", "ffff", "gggg"}, + []string{"aaaa"}, + []string{"dddd"}, + ), + exp: []string{"eeee", "ffff", "gggg"}, + }, + + { + name: "keys same across all files", + files: newTSMFiles( + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd"}, + ), + exp: []string{"aaaa", "bbbb", "cccc", "dddd"}, + }, + + { + name: "keys same across all files with extra", + files: newTSMFiles( + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}, + ), + exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}, + }, + + { + name: "seek skips all files", + seek: "eeee", + files: newTSMFiles( + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd"}, + []string{"aaaa", "bbbb", "cccc", "dddd"}, + ), + exp: nil, + }, + + { + name: "keys sequential across all files", + files: newTSMFiles( + []string{"a", "b", "c", "d"}, + []string{"e", "f", "g", "h"}, + []string{"i", "j", "k", "l"}, + ), + exp: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"}, + }, + + { + name: "seek past one file", + seek: "e", + files: newTSMFiles( + []string{"a", "b", "c", "d"}, + []string{"e", "f", "g", "h"}, + []string{"i", "j", "k", "l"}, + ), + exp: []string{"e", "f", "g", "h", "i", "j", "k", "l"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ki := newMergeKeyIterator(tc.files, []byte(tc.seek)) + var act []string + for ki.Next() { + key, _ := ki.Read() + act = append(act, string(key)) + } + if !cmp.Equal(tc.exp, act) { + t.Error(cmp.Diff(tc.exp, act)) + } + }) + } + +} + +func newTSMFiles(keys ...[]string) []TSMFile { + var files []TSMFile + for _, k := range keys { + files = append(files, newMockTSMFile(k...)) + } + return files +} + +type mockTSMFile struct { + keys []string +} + +func newMockTSMFile(keys ...string) *mockTSMFile { + sort.Strings(keys) + return &mockTSMFile{keys: keys} +} + +func (t *mockTSMFile) KeyCount() int { return len(t.keys) } + +func (t *mockTSMFile) Seek(key []byte) int { + k := string(key) + return sort.Search(len(t.keys), func(i int) bool { + return t.keys[i] >= k + }) +} + +func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) { + return []byte(t.keys[idx]), BlockFloat64 +} + +func (*mockTSMFile) Path() string { panic("implement me") } +func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") } +func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") } +func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") } +func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") } +func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") } +func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") } +func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") } +func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") } +func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") } +func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") } +func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") } +func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") } +func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") } +func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") } +func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") } +func (*mockTSMFile) HasTombstones() bool { panic("implement me") } +func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") } +func (*mockTSMFile) Close() error { panic("implement me") } +func (*mockTSMFile) Size() uint32 { panic("implement me") } +func (*mockTSMFile) Rename(path string) error { panic("implement me") } +func (*mockTSMFile) Remove() error { panic("implement me") } +func (*mockTSMFile) InUse() bool { panic("implement me") } +func (*mockTSMFile) Ref() { panic("implement me") } +func (*mockTSMFile) Unref() { panic("implement me") } +func (*mockTSMFile) Stats() FileStat { panic("implement me") } +func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") } +func (*mockTSMFile) Free() error { panic("implement me") } + +func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) { + panic("implement me") +} + +func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) { + panic("implement me") +} + +func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) { + panic("implement me") +} + +func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) { + panic("implement me") +} + +func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) { + panic("implement me") +}