tsm1: ensure some race conditions are impossible
The InUse call on TSMFiles is inherently racy in the presence of Ref calls outside of the file store mutex. In addition, we return some TSMFiles to callers without them being Ref'd which might allow them to be closed from underneath. While I believe it is the case that it would be impossible, as the only thing that gets a handle externally is compaction, and compaction enforces that only one handle exists at a time, and thus is only deleted once after the compaction is done with it, it's not very obvious or enforced. Instead, always return a TSMFile with a Ref call under the read lock, and require that no one else calls Ref. That way, it cannot transition to referenced if the InUse call returns false under the write lock. The CreateSnapshot method was racy in a number of ways in the presence of multiple calls or compactions: it did not take references to the TSMFiles, and the temporary directory it creates could have been shared with concurrent CreateSnapshot calls. In addition, the files slice could have been concurrently mutated during a compaction as well. Instead, under the write lock, make a local copy of the state for the compaction, including Ref calls (write locks are implicitly read locks). Then, there is no need for a lock at all afterward. Add some comments to explain these issues at the call sites of InUse, and document that the Files method that returns the slice unprotected is only for tests.pull/9841/head
parent
200fda999f
commit
7d2bb19b74
|
@ -894,6 +894,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
// doesn't exist.
|
||||
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
|
||||
}
|
||||
defer tr.Unref() // inform that we're done with this reader when this method returns.
|
||||
trs = append(trs, tr)
|
||||
}
|
||||
|
||||
|
|
|
@ -2892,6 +2892,7 @@ func (w *fakeFileStore) BlockCount(path string, idx int) int {
|
|||
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
|
||||
r := MustOpenTSMReader(path)
|
||||
w.readers = append(w.readers, r)
|
||||
r.Ref()
|
||||
return r
|
||||
}
|
||||
|
||||
|
|
|
@ -267,7 +267,8 @@ func (f *FileStore) Count() int {
|
|||
return len(f.files)
|
||||
}
|
||||
|
||||
// Files returns the slice of TSM files currently loaded.
|
||||
// Files returns the slice of TSM files currently loaded. This is only used for
|
||||
// tests, and the files aren't guaranteed to stay valid in the presense of compactions.
|
||||
func (f *FileStore) Files() []TSMFile {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
@ -595,12 +596,14 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
|
|||
}
|
||||
|
||||
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
|
||||
// Otherwise it returns nil.
|
||||
// Otherwise it returns nil. If it returns a file, you must call Unref on it when
|
||||
// you are done, and never use it after that.
|
||||
func (f *FileStore) TSMReader(path string) *TSMReader {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
for _, r := range f.files {
|
||||
if r.Path() == path {
|
||||
r.Ref()
|
||||
return r.(*TSMReader)
|
||||
}
|
||||
}
|
||||
|
@ -725,6 +728,12 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
// way and let them complete. We'll then delete the original file to avoid
|
||||
// blocking callers upstream. If the process crashes, the temp file is
|
||||
// cleaned up at startup automatically.
|
||||
//
|
||||
// In order to ensure that there are no races with this (file held externally calls Ref
|
||||
// after we check InUse), we need to maintain the invariant that every handle to a file
|
||||
// is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
|
||||
// exactly once, and never use it again). InUse is only valid during a write lock, since
|
||||
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
|
||||
if file.InUse() {
|
||||
// Copy all the tombstones related to this TSM file
|
||||
var deletes []string
|
||||
|
@ -950,29 +959,36 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
|
|||
// in the path provided.
|
||||
func (f *FileStore) CreateSnapshot() (string, error) {
|
||||
f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir))
|
||||
files := f.Files()
|
||||
|
||||
f.mu.Lock()
|
||||
// create a copy of the files slice and ensure they aren't closed out from
|
||||
// under us, nor the slice mutated.
|
||||
files := make([]TSMFile, len(f.files))
|
||||
copy(files, f.files)
|
||||
|
||||
for _, tsmf := range files {
|
||||
tsmf.Ref()
|
||||
defer tsmf.Unref()
|
||||
}
|
||||
|
||||
// increment and keep track of the current temp dir for when we drop the lock.
|
||||
// this ensures we are the only writer to the directory.
|
||||
f.currentTempDirID += 1
|
||||
f.mu.Unlock()
|
||||
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
// get a tmp directory name
|
||||
tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension)
|
||||
tmpPath = filepath.Join(f.dir, tmpPath)
|
||||
f.mu.Unlock()
|
||||
|
||||
// create the tmp directory and add the hard links. there is no longer any shared
|
||||
// mutable state.
|
||||
err := os.Mkdir(tmpPath, 0777)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, tsmf := range files {
|
||||
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
|
||||
if err := os.Link(tsmf.Path(), newpath); err != nil {
|
||||
return "", fmt.Errorf("error creating tsm hard link: %q", err)
|
||||
}
|
||||
// Check for tombstones and link those as well
|
||||
for _, tf := range tsmf.TombstoneFiles() {
|
||||
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
|
||||
if err := os.Link(tf.Path, newpath); err != nil {
|
||||
|
@ -1296,6 +1312,11 @@ func (p *purger) purge() {
|
|||
for {
|
||||
p.mu.Lock()
|
||||
for k, v := range p.files {
|
||||
// In order to ensure that there are no races with this (file held externally calls Ref
|
||||
// after we check InUse), we need to maintain the invariant that every handle to a file
|
||||
// is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
|
||||
// exactly once, and never use it again). InUse is only valid during a write lock, since
|
||||
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
|
||||
if !v.InUse() {
|
||||
if err := v.Close(); err != nil {
|
||||
p.logger.Info("Purge: close file", zap.Error(err))
|
||||
|
|
Loading…
Reference in New Issue