Use existing TSMReader from file store during compactions

Compactions would create their own TSMReaders for simplicity. With
very high cardinality compactions, creating the reader and indirectIndex
can start to use a significant amount of memory.

This changes the compactions to use a reader that is already allocated
and managed by the FileStore.
pull/8796/head
Jason Wilder 2017-08-24 16:17:13 -06:00
parent 739ecd2ebd
commit 91eb9de341
3 changed files with 73 additions and 22 deletions

View File

@ -41,7 +41,6 @@ var (
errMaxFileExceeded = fmt.Errorf("max file exceeded") errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled") errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
errCompactionsDisabled = fmt.Errorf("compactions disabled") errCompactionsDisabled = fmt.Errorf("compactions disabled")
errCompactionAborted = fmt.Errorf("compaction aborted")
) )
type errCompactionInProgress struct { type errCompactionInProgress struct {
@ -56,6 +55,17 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress" return "compaction in progress"
} }
type errCompactionAborted struct {
err error
}
func (e errCompactionAborted) Error() string {
if e.err != nil {
return fmt.Sprintf("compaction aborted: %s", e.err)
}
return "compaction aborted"
}
// CompactionGroup represents a list of files eligible to be compacted together. // CompactionGroup represents a list of files eligible to be compacted together.
type CompactionGroup []string type CompactionGroup []string
@ -586,6 +596,7 @@ type Compactor struct {
FileStore interface { FileStore interface {
NextGeneration() int NextGeneration() int
TSMReader(path string) *TSMReader
} }
mu sync.RWMutex mu sync.RWMutex
@ -737,20 +748,17 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
for _, file := range tsmFiles { for _, file := range tsmFiles {
select { select {
case <-intC: case <-intC:
return nil, errCompactionAborted return nil, errCompactionAborted{}
default: default:
} }
f, err := os.Open(file) tr := c.FileStore.TSMReader(file)
if err != nil { if tr == nil {
return nil, err // This would be a bug if this occurred as tsmFiles passed in should only be
// assigned to one compaction at any one time. A nil tr would mean the file
// doesn't exist.
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
} }
tr, err := NewTSMReader(f)
if err != nil {
return nil, err
}
defer tr.Close()
trs = append(trs, tr) trs = append(trs, tr)
} }
@ -917,7 +925,7 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
c.mu.RUnlock() c.mu.RUnlock()
if !enabled { if !enabled {
return errCompactionAborted return errCompactionAborted{}
} }
// Each call to read returns the next sorted key (or the prior one if there are // Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our // more values to write). The size of values will be less than or equal to our
@ -1260,7 +1268,7 @@ func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if compactions were disabled while we were running. // See if compactions were disabled while we were running.
select { select {
case <-k.interrupt: case <-k.interrupt:
return nil, 0, 0, nil, errCompactionAborted return nil, 0, 0, nil, errCompactionAborted{}
default: default:
} }
@ -1400,7 +1408,7 @@ func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running. // See if snapshot compactions were disabled while we were running.
select { select {
case <-c.interrupt: case <-c.interrupt:
return nil, 0, 0, nil, errCompactionAborted return nil, 0, 0, nil, errCompactionAborted{}
default: default:
} }

View File

@ -116,9 +116,11 @@ func TestCompactor_CompactFull(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
} }
files, err := compactor.CompactFull([]string{f1, f2, f3}) files, err := compactor.CompactFull([]string{f1, f2, f3})
@ -215,9 +217,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
@ -294,9 +298,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
@ -365,9 +371,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
compactor.Open() compactor.Open()
@ -464,9 +472,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
compactor.Open() compactor.Open()
@ -564,9 +574,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
compactor.Open() compactor.Open()
@ -669,9 +681,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
} }
f3 := MustWriteTSM(dir, 3, writes) f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
Size: 2, Size: 2,
} }
compactor.Open() compactor.Open()
@ -782,9 +796,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
} }
f2.Close() f2.Close()
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{ compactor := &tsm1.Compactor{
Dir: dir, Dir: dir,
FileStore: &fakeFileStore{}, FileStore: fs,
} }
compactor.Open() compactor.Open()
@ -2434,6 +2450,7 @@ type fakeFileStore struct {
PathsFn func() []tsm1.FileStat PathsFn func() []tsm1.FileStat
lastModified time.Time lastModified time.Time
blockCount int blockCount int
readers []*tsm1.TSMReader
} }
func (w *fakeFileStore) Stats() []tsm1.FileStat { func (w *fakeFileStore) Stats() []tsm1.FileStat {
@ -2451,3 +2468,16 @@ func (w *fakeFileStore) LastModified() time.Time {
func (w *fakeFileStore) BlockCount(path string, idx int) int { func (w *fakeFileStore) BlockCount(path string, idx int) int {
return w.blockCount return w.blockCount
} }
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
r := MustOpenTSMReader(path)
w.readers = append(w.readers, r)
return r
}
func (w *fakeFileStore) Close() {
for _, r := range w.readers {
r.Close()
}
w.readers = nil
}

View File

@ -479,6 +479,19 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
return f.cost(key, min, max) return f.cost(key, min, max)
} }
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
// Otherwise it returns nil.
func (f *FileStore) TSMReader(path string) *TSMReader {
f.mu.RLock()
defer f.mu.RUnlock()
for _, r := range f.files {
if r.Path() == path {
return r.(*TSMReader)
}
}
return nil
}
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore. // KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor { func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
f.mu.RLock() f.mu.RLock()