Use existing TSMReader from file store during compactions
Compactions would create their own TSMReaders for simplicity. With very high cardinality compactions, creating the reader and indirectIndex can start to use a significant amount of memory. This changes the compactions to use a reader that is already allocated and managed by the FileStore.pull/8796/head
parent
739ecd2ebd
commit
91eb9de341
|
@ -41,7 +41,6 @@ var (
|
||||||
errMaxFileExceeded = fmt.Errorf("max file exceeded")
|
errMaxFileExceeded = fmt.Errorf("max file exceeded")
|
||||||
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
|
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
|
||||||
errCompactionsDisabled = fmt.Errorf("compactions disabled")
|
errCompactionsDisabled = fmt.Errorf("compactions disabled")
|
||||||
errCompactionAborted = fmt.Errorf("compaction aborted")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type errCompactionInProgress struct {
|
type errCompactionInProgress struct {
|
||||||
|
@ -56,6 +55,17 @@ func (e errCompactionInProgress) Error() string {
|
||||||
return "compaction in progress"
|
return "compaction in progress"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type errCompactionAborted struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errCompactionAborted) Error() string {
|
||||||
|
if e.err != nil {
|
||||||
|
return fmt.Sprintf("compaction aborted: %s", e.err)
|
||||||
|
}
|
||||||
|
return "compaction aborted"
|
||||||
|
}
|
||||||
|
|
||||||
// CompactionGroup represents a list of files eligible to be compacted together.
|
// CompactionGroup represents a list of files eligible to be compacted together.
|
||||||
type CompactionGroup []string
|
type CompactionGroup []string
|
||||||
|
|
||||||
|
@ -586,6 +596,7 @@ type Compactor struct {
|
||||||
|
|
||||||
FileStore interface {
|
FileStore interface {
|
||||||
NextGeneration() int
|
NextGeneration() int
|
||||||
|
TSMReader(path string) *TSMReader
|
||||||
}
|
}
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
@ -737,20 +748,17 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
||||||
for _, file := range tsmFiles {
|
for _, file := range tsmFiles {
|
||||||
select {
|
select {
|
||||||
case <-intC:
|
case <-intC:
|
||||||
return nil, errCompactionAborted
|
return nil, errCompactionAborted{}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.Open(file)
|
tr := c.FileStore.TSMReader(file)
|
||||||
if err != nil {
|
if tr == nil {
|
||||||
return nil, err
|
// This would be a bug if this occurred as tsmFiles passed in should only be
|
||||||
|
// assigned to one compaction at any one time. A nil tr would mean the file
|
||||||
|
// doesn't exist.
|
||||||
|
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
|
||||||
}
|
}
|
||||||
|
|
||||||
tr, err := NewTSMReader(f)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer tr.Close()
|
|
||||||
trs = append(trs, tr)
|
trs = append(trs, tr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -917,7 +925,7 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !enabled {
|
if !enabled {
|
||||||
return errCompactionAborted
|
return errCompactionAborted{}
|
||||||
}
|
}
|
||||||
// Each call to read returns the next sorted key (or the prior one if there are
|
// Each call to read returns the next sorted key (or the prior one if there are
|
||||||
// more values to write). The size of values will be less than or equal to our
|
// more values to write). The size of values will be less than or equal to our
|
||||||
|
@ -1260,7 +1268,7 @@ func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
|
||||||
// See if compactions were disabled while we were running.
|
// See if compactions were disabled while we were running.
|
||||||
select {
|
select {
|
||||||
case <-k.interrupt:
|
case <-k.interrupt:
|
||||||
return nil, 0, 0, nil, errCompactionAborted
|
return nil, 0, 0, nil, errCompactionAborted{}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1400,7 +1408,7 @@ func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
|
||||||
// See if snapshot compactions were disabled while we were running.
|
// See if snapshot compactions were disabled while we were running.
|
||||||
select {
|
select {
|
||||||
case <-c.interrupt:
|
case <-c.interrupt:
|
||||||
return nil, 0, 0, nil, errCompactionAborted
|
return nil, 0, 0, nil, errCompactionAborted{}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,9 +116,11 @@ func TestCompactor_CompactFull(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
}
|
}
|
||||||
|
|
||||||
files, err := compactor.CompactFull([]string{f1, f2, f3})
|
files, err := compactor.CompactFull([]string{f1, f2, f3})
|
||||||
|
@ -215,9 +217,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,9 +298,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,9 +371,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
compactor.Open()
|
compactor.Open()
|
||||||
|
@ -464,9 +472,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
compactor.Open()
|
compactor.Open()
|
||||||
|
@ -564,9 +574,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
compactor.Open()
|
compactor.Open()
|
||||||
|
@ -669,9 +681,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
|
||||||
}
|
}
|
||||||
f3 := MustWriteTSM(dir, 3, writes)
|
f3 := MustWriteTSM(dir, 3, writes)
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
Size: 2,
|
Size: 2,
|
||||||
}
|
}
|
||||||
compactor.Open()
|
compactor.Open()
|
||||||
|
@ -782,9 +796,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
|
||||||
}
|
}
|
||||||
f2.Close()
|
f2.Close()
|
||||||
|
|
||||||
|
fs := &fakeFileStore{}
|
||||||
|
defer fs.Close()
|
||||||
compactor := &tsm1.Compactor{
|
compactor := &tsm1.Compactor{
|
||||||
Dir: dir,
|
Dir: dir,
|
||||||
FileStore: &fakeFileStore{},
|
FileStore: fs,
|
||||||
}
|
}
|
||||||
compactor.Open()
|
compactor.Open()
|
||||||
|
|
||||||
|
@ -2434,6 +2450,7 @@ type fakeFileStore struct {
|
||||||
PathsFn func() []tsm1.FileStat
|
PathsFn func() []tsm1.FileStat
|
||||||
lastModified time.Time
|
lastModified time.Time
|
||||||
blockCount int
|
blockCount int
|
||||||
|
readers []*tsm1.TSMReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *fakeFileStore) Stats() []tsm1.FileStat {
|
func (w *fakeFileStore) Stats() []tsm1.FileStat {
|
||||||
|
@ -2451,3 +2468,16 @@ func (w *fakeFileStore) LastModified() time.Time {
|
||||||
func (w *fakeFileStore) BlockCount(path string, idx int) int {
|
func (w *fakeFileStore) BlockCount(path string, idx int) int {
|
||||||
return w.blockCount
|
return w.blockCount
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
|
||||||
|
r := MustOpenTSMReader(path)
|
||||||
|
w.readers = append(w.readers, r)
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeFileStore) Close() {
|
||||||
|
for _, r := range w.readers {
|
||||||
|
r.Close()
|
||||||
|
}
|
||||||
|
w.readers = nil
|
||||||
|
}
|
||||||
|
|
|
@ -479,6 +479,19 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
|
||||||
return f.cost(key, min, max)
|
return f.cost(key, min, max)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
|
||||||
|
// Otherwise it returns nil.
|
||||||
|
func (f *FileStore) TSMReader(path string) *TSMReader {
|
||||||
|
f.mu.RLock()
|
||||||
|
defer f.mu.RUnlock()
|
||||||
|
for _, r := range f.files {
|
||||||
|
if r.Path() == path {
|
||||||
|
return r.(*TSMReader)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
|
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
|
||||||
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
|
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
|
||||||
f.mu.RLock()
|
f.mu.RLock()
|
||||||
|
|
Loading…
Reference in New Issue