Use existing TSMReader from file store during compactions
Compactions would create their own TSMReaders for simplicity. With very high cardinality compactions, creating the reader and indirectIndex can start to use a significant amount of memory. This changes the compactions to use a reader that is already allocated and managed by the FileStore.pull/8796/head
parent
739ecd2ebd
commit
91eb9de341
|
@ -41,7 +41,6 @@ var (
|
|||
errMaxFileExceeded = fmt.Errorf("max file exceeded")
|
||||
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
|
||||
errCompactionsDisabled = fmt.Errorf("compactions disabled")
|
||||
errCompactionAborted = fmt.Errorf("compaction aborted")
|
||||
)
|
||||
|
||||
type errCompactionInProgress struct {
|
||||
|
@ -56,6 +55,17 @@ func (e errCompactionInProgress) Error() string {
|
|||
return "compaction in progress"
|
||||
}
|
||||
|
||||
type errCompactionAborted struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e errCompactionAborted) Error() string {
|
||||
if e.err != nil {
|
||||
return fmt.Sprintf("compaction aborted: %s", e.err)
|
||||
}
|
||||
return "compaction aborted"
|
||||
}
|
||||
|
||||
// CompactionGroup represents a list of files eligible to be compacted together.
|
||||
type CompactionGroup []string
|
||||
|
||||
|
@ -586,6 +596,7 @@ type Compactor struct {
|
|||
|
||||
FileStore interface {
|
||||
NextGeneration() int
|
||||
TSMReader(path string) *TSMReader
|
||||
}
|
||||
|
||||
mu sync.RWMutex
|
||||
|
@ -737,20 +748,17 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
for _, file := range tsmFiles {
|
||||
select {
|
||||
case <-intC:
|
||||
return nil, errCompactionAborted
|
||||
return nil, errCompactionAborted{}
|
||||
default:
|
||||
}
|
||||
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
tr := c.FileStore.TSMReader(file)
|
||||
if tr == nil {
|
||||
// This would be a bug if this occurred as tsmFiles passed in should only be
|
||||
// assigned to one compaction at any one time. A nil tr would mean the file
|
||||
// doesn't exist.
|
||||
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
|
||||
}
|
||||
|
||||
tr, err := NewTSMReader(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tr.Close()
|
||||
trs = append(trs, tr)
|
||||
}
|
||||
|
||||
|
@ -917,7 +925,7 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
|
|||
c.mu.RUnlock()
|
||||
|
||||
if !enabled {
|
||||
return errCompactionAborted
|
||||
return errCompactionAborted{}
|
||||
}
|
||||
// Each call to read returns the next sorted key (or the prior one if there are
|
||||
// more values to write). The size of values will be less than or equal to our
|
||||
|
@ -1260,7 +1268,7 @@ func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
|
|||
// See if compactions were disabled while we were running.
|
||||
select {
|
||||
case <-k.interrupt:
|
||||
return nil, 0, 0, nil, errCompactionAborted
|
||||
return nil, 0, 0, nil, errCompactionAborted{}
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -1400,7 +1408,7 @@ func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
|
|||
// See if snapshot compactions were disabled while we were running.
|
||||
select {
|
||||
case <-c.interrupt:
|
||||
return nil, 0, 0, nil, errCompactionAborted
|
||||
return nil, 0, 0, nil, errCompactionAborted{}
|
||||
default:
|
||||
}
|
||||
|
||||
|
|
|
@ -116,9 +116,11 @@ func TestCompactor_CompactFull(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
}
|
||||
|
||||
files, err := compactor.CompactFull([]string{f1, f2, f3})
|
||||
|
@ -215,9 +217,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
|
||||
|
@ -294,9 +298,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
|
||||
|
@ -365,9 +371,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
compactor.Open()
|
||||
|
@ -464,9 +472,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
compactor.Open()
|
||||
|
@ -564,9 +574,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
compactor.Open()
|
||||
|
@ -669,9 +681,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
|
|||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
Size: 2,
|
||||
}
|
||||
compactor.Open()
|
||||
|
@ -782,9 +796,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
|
|||
}
|
||||
f2.Close()
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
FileStore: fs,
|
||||
}
|
||||
compactor.Open()
|
||||
|
||||
|
@ -2434,6 +2450,7 @@ type fakeFileStore struct {
|
|||
PathsFn func() []tsm1.FileStat
|
||||
lastModified time.Time
|
||||
blockCount int
|
||||
readers []*tsm1.TSMReader
|
||||
}
|
||||
|
||||
func (w *fakeFileStore) Stats() []tsm1.FileStat {
|
||||
|
@ -2451,3 +2468,16 @@ func (w *fakeFileStore) LastModified() time.Time {
|
|||
func (w *fakeFileStore) BlockCount(path string, idx int) int {
|
||||
return w.blockCount
|
||||
}
|
||||
|
||||
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
|
||||
r := MustOpenTSMReader(path)
|
||||
w.readers = append(w.readers, r)
|
||||
return r
|
||||
}
|
||||
|
||||
func (w *fakeFileStore) Close() {
|
||||
for _, r := range w.readers {
|
||||
r.Close()
|
||||
}
|
||||
w.readers = nil
|
||||
}
|
||||
|
|
|
@ -479,6 +479,19 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
|
|||
return f.cost(key, min, max)
|
||||
}
|
||||
|
||||
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
|
||||
// Otherwise it returns nil.
|
||||
func (f *FileStore) TSMReader(path string) *TSMReader {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
for _, r := range f.files {
|
||||
if r.Path() == path {
|
||||
return r.(*TSMReader)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
|
||||
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
|
||||
f.mu.RLock()
|
||||
|
|
Loading…
Reference in New Issue