improve startup performance
* replaces coordinating goroutines for single k-way heap merge iterator * removes contention sending keys across buffered channels startup time from 46s -> 28s for iterating 1MM keys across 14 shardspull/9162/head
parent
3c9eb9dad2
commit
e1ec331048
|
@ -303,49 +303,16 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e
|
|||
return nil
|
||||
}
|
||||
|
||||
readers := make([]chan seriesKey, 0, len(f.files))
|
||||
done := make(chan struct{})
|
||||
for _, f := range f.files {
|
||||
ch := make(chan seriesKey, 1)
|
||||
readers = append(readers, ch)
|
||||
|
||||
go func(c chan seriesKey, r TSMFile) {
|
||||
|
||||
start := 0
|
||||
if len(seek) > 0 {
|
||||
start = r.Seek(seek)
|
||||
}
|
||||
n := r.KeyCount()
|
||||
for i := start; i < n; i++ {
|
||||
|
||||
key, typ := r.KeyAt(i)
|
||||
select {
|
||||
case <-done:
|
||||
// Abort iteration
|
||||
break
|
||||
case c <- seriesKey{key, typ}:
|
||||
}
|
||||
|
||||
}
|
||||
close(ch)
|
||||
}(ch, f)
|
||||
}
|
||||
ki := newMergeKeyIterator(f.files, seek)
|
||||
f.mu.RUnlock()
|
||||
|
||||
merged := merge(readers...)
|
||||
var err error
|
||||
for v := range merged {
|
||||
// Drain the remaing values so goroutines can exit
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err = fn(v.key, v.typ); err != nil {
|
||||
// Signal that we should stop iterating
|
||||
close(done)
|
||||
for ki.Next() {
|
||||
key, typ := ki.Read()
|
||||
if err := fn(key, typ); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Keys returns all keys and types for all files in the file store.
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
)
|
||||
|
||||
type keyIterator struct {
|
||||
f TSMFile
|
||||
p, n int
|
||||
key []byte
|
||||
typ byte
|
||||
}
|
||||
|
||||
func newKeyIterator(f TSMFile, seek []byte) *keyIterator {
|
||||
p, n := 0, f.KeyCount()
|
||||
if len(seek) > 0 {
|
||||
p = f.Seek(seek)
|
||||
}
|
||||
|
||||
if p >= n {
|
||||
return nil
|
||||
}
|
||||
|
||||
k := &keyIterator{f: f, p: p, n: n}
|
||||
k.next()
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
func (k *keyIterator) next() bool {
|
||||
if k.p < k.n {
|
||||
k.key, k.typ = k.f.KeyAt(k.p)
|
||||
k.p++
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type mergeKeyIterator struct {
|
||||
itrs keyIterators
|
||||
key []byte
|
||||
typ byte
|
||||
}
|
||||
|
||||
func newMergeKeyIterator(files []TSMFile, seek []byte) *mergeKeyIterator {
|
||||
m := &mergeKeyIterator{}
|
||||
itrs := make(keyIterators, 0, len(files))
|
||||
for _, f := range files {
|
||||
if ki := newKeyIterator(f, seek); ki != nil {
|
||||
itrs = append(itrs, ki)
|
||||
}
|
||||
}
|
||||
m.itrs = itrs
|
||||
heap.Init(&m.itrs)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *mergeKeyIterator) Next() bool {
|
||||
merging := len(m.itrs) > 1
|
||||
|
||||
RETRY:
|
||||
if len(m.itrs) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
key, typ := m.itrs[0].key, m.itrs[0].typ
|
||||
more := m.itrs[0].next()
|
||||
|
||||
switch {
|
||||
case len(m.itrs) > 1:
|
||||
if !more {
|
||||
// remove iterator from heap
|
||||
heap.Pop(&m.itrs)
|
||||
} else {
|
||||
heap.Fix(&m.itrs, 0)
|
||||
}
|
||||
|
||||
case len(m.itrs) == 1:
|
||||
if !more {
|
||||
m.itrs = nil
|
||||
}
|
||||
}
|
||||
|
||||
if merging && bytes.Compare(m.key, key) == 0 {
|
||||
// same as previous key, keep iterating
|
||||
goto RETRY
|
||||
}
|
||||
|
||||
m.key, m.typ = key, typ
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ }
|
||||
|
||||
type keyIterators []*keyIterator
|
||||
|
||||
func (k keyIterators) Len() int { return len(k) }
|
||||
func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 }
|
||||
func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
|
||||
func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) }
|
||||
|
||||
func (k *keyIterators) Pop() interface{} {
|
||||
old := *k
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*k = old[:n-1]
|
||||
return x
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestNewMergeKeyIterator(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
seek string
|
||||
files []TSMFile
|
||||
|
||||
exp []string
|
||||
}{
|
||||
{
|
||||
name: "mixed",
|
||||
files: newTSMFiles(
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "cccc", "dddd"},
|
||||
[]string{"eeee", "ffff", "gggg"},
|
||||
[]string{"aaaa"},
|
||||
[]string{"dddd"},
|
||||
),
|
||||
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff", "gggg"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "similar keys",
|
||||
files: newTSMFiles(
|
||||
[]string{"a", "aaa"},
|
||||
[]string{"aa", "aaaa"},
|
||||
),
|
||||
exp: []string{"a", "aa", "aaa", "aaaa"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "seek skips some files",
|
||||
seek: "eeee",
|
||||
files: newTSMFiles(
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "cccc", "dddd"},
|
||||
[]string{"eeee", "ffff", "gggg"},
|
||||
[]string{"aaaa"},
|
||||
[]string{"dddd"},
|
||||
),
|
||||
exp: []string{"eeee", "ffff", "gggg"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "keys same across all files",
|
||||
files: newTSMFiles(
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
),
|
||||
exp: []string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "keys same across all files with extra",
|
||||
files: newTSMFiles(
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
|
||||
),
|
||||
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "seek skips all files",
|
||||
seek: "eeee",
|
||||
files: newTSMFiles(
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
[]string{"aaaa", "bbbb", "cccc", "dddd"},
|
||||
),
|
||||
exp: nil,
|
||||
},
|
||||
|
||||
{
|
||||
name: "keys sequential across all files",
|
||||
files: newTSMFiles(
|
||||
[]string{"a", "b", "c", "d"},
|
||||
[]string{"e", "f", "g", "h"},
|
||||
[]string{"i", "j", "k", "l"},
|
||||
),
|
||||
exp: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"},
|
||||
},
|
||||
|
||||
{
|
||||
name: "seek past one file",
|
||||
seek: "e",
|
||||
files: newTSMFiles(
|
||||
[]string{"a", "b", "c", "d"},
|
||||
[]string{"e", "f", "g", "h"},
|
||||
[]string{"i", "j", "k", "l"},
|
||||
),
|
||||
exp: []string{"e", "f", "g", "h", "i", "j", "k", "l"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ki := newMergeKeyIterator(tc.files, []byte(tc.seek))
|
||||
var act []string
|
||||
for ki.Next() {
|
||||
key, _ := ki.Read()
|
||||
act = append(act, string(key))
|
||||
}
|
||||
if !cmp.Equal(tc.exp, act) {
|
||||
t.Error(cmp.Diff(tc.exp, act))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newTSMFiles(keys ...[]string) []TSMFile {
|
||||
var files []TSMFile
|
||||
for _, k := range keys {
|
||||
files = append(files, newMockTSMFile(k...))
|
||||
}
|
||||
return files
|
||||
}
|
||||
|
||||
type mockTSMFile struct {
|
||||
keys []string
|
||||
}
|
||||
|
||||
func newMockTSMFile(keys ...string) *mockTSMFile {
|
||||
sort.Strings(keys)
|
||||
return &mockTSMFile{keys: keys}
|
||||
}
|
||||
|
||||
func (t *mockTSMFile) KeyCount() int { return len(t.keys) }
|
||||
|
||||
func (t *mockTSMFile) Seek(key []byte) int {
|
||||
k := string(key)
|
||||
return sort.Search(len(t.keys), func(i int) bool {
|
||||
return t.keys[i] >= k
|
||||
})
|
||||
}
|
||||
|
||||
func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) {
|
||||
return []byte(t.keys[idx]), BlockFloat64
|
||||
}
|
||||
|
||||
func (*mockTSMFile) Path() string { panic("implement me") }
|
||||
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
|
||||
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
|
||||
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
|
||||
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") }
|
||||
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
|
||||
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
|
||||
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
|
||||
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
|
||||
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
|
||||
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
|
||||
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
|
||||
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
|
||||
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
|
||||
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
|
||||
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
|
||||
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
|
||||
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
|
||||
func (*mockTSMFile) Close() error { panic("implement me") }
|
||||
func (*mockTSMFile) Size() uint32 { panic("implement me") }
|
||||
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
|
||||
func (*mockTSMFile) Remove() error { panic("implement me") }
|
||||
func (*mockTSMFile) InUse() bool { panic("implement me") }
|
||||
func (*mockTSMFile) Ref() { panic("implement me") }
|
||||
func (*mockTSMFile) Unref() { panic("implement me") }
|
||||
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
|
||||
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
|
||||
func (*mockTSMFile) Free() error { panic("implement me") }
|
||||
|
||||
func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
|
||||
panic("implement me")
|
||||
}
|
Loading…
Reference in New Issue