Merge pull request #9841 from influxdata/jmw-ensure-no-race-conditions
tsm1: ensure some race conditions are impossiblepull/9858/head
commit
1a8931af42
|
@ -905,6 +905,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
// doesn't exist.
|
||||
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
|
||||
}
|
||||
defer tr.Unref() // inform that we're done with this reader when this method returns.
|
||||
trs = append(trs, tr)
|
||||
}
|
||||
|
||||
|
|
|
@ -2881,6 +2881,7 @@ func (w *fakeFileStore) BlockCount(path string, idx int) int {
|
|||
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
|
||||
r := MustOpenTSMReader(path)
|
||||
w.readers = append(w.readers, r)
|
||||
r.Ref()
|
||||
return r
|
||||
}
|
||||
|
||||
|
|
|
@ -267,7 +267,8 @@ func (f *FileStore) Count() int {
|
|||
return len(f.files)
|
||||
}
|
||||
|
||||
// Files returns the slice of TSM files currently loaded.
|
||||
// Files returns the slice of TSM files currently loaded. This is only used for
|
||||
// tests, and the files aren't guaranteed to stay valid in the presense of compactions.
|
||||
func (f *FileStore) Files() []TSMFile {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
@ -595,12 +596,14 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
|
|||
}
|
||||
|
||||
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
|
||||
// Otherwise it returns nil.
|
||||
// Otherwise it returns nil. If it returns a file, you must call Unref on it when
|
||||
// you are done, and never use it after that.
|
||||
func (f *FileStore) TSMReader(path string) *TSMReader {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
for _, r := range f.files {
|
||||
if r.Path() == path {
|
||||
r.Ref()
|
||||
return r.(*TSMReader)
|
||||
}
|
||||
}
|
||||
|
@ -725,6 +728,12 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
// way and let them complete. We'll then delete the original file to avoid
|
||||
// blocking callers upstream. If the process crashes, the temp file is
|
||||
// cleaned up at startup automatically.
|
||||
//
|
||||
// In order to ensure that there are no races with this (file held externally calls Ref
|
||||
// after we check InUse), we need to maintain the invariant that every handle to a file
|
||||
// is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
|
||||
// exactly once, and never use it again). InUse is only valid during a write lock, since
|
||||
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
|
||||
if file.InUse() {
|
||||
// Copy all the tombstones related to this TSM file
|
||||
var deletes []string
|
||||
|
@ -950,29 +959,36 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
|
|||
// in the path provided.
|
||||
func (f *FileStore) CreateSnapshot() (string, error) {
|
||||
f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir))
|
||||
files := f.Files()
|
||||
|
||||
f.mu.Lock()
|
||||
// create a copy of the files slice and ensure they aren't closed out from
|
||||
// under us, nor the slice mutated.
|
||||
files := make([]TSMFile, len(f.files))
|
||||
copy(files, f.files)
|
||||
|
||||
for _, tsmf := range files {
|
||||
tsmf.Ref()
|
||||
defer tsmf.Unref()
|
||||
}
|
||||
|
||||
// increment and keep track of the current temp dir for when we drop the lock.
|
||||
// this ensures we are the only writer to the directory.
|
||||
f.currentTempDirID += 1
|
||||
f.mu.Unlock()
|
||||
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
// get a tmp directory name
|
||||
tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension)
|
||||
tmpPath = filepath.Join(f.dir, tmpPath)
|
||||
f.mu.Unlock()
|
||||
|
||||
// create the tmp directory and add the hard links. there is no longer any shared
|
||||
// mutable state.
|
||||
err := os.Mkdir(tmpPath, 0777)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, tsmf := range files {
|
||||
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
|
||||
if err := os.Link(tsmf.Path(), newpath); err != nil {
|
||||
return "", fmt.Errorf("error creating tsm hard link: %q", err)
|
||||
}
|
||||
// Check for tombstones and link those as well
|
||||
for _, tf := range tsmf.TombstoneFiles() {
|
||||
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
|
||||
if err := os.Link(tf.Path, newpath); err != nil {
|
||||
|
@ -1308,6 +1324,11 @@ func (p *purger) purge() {
|
|||
for {
|
||||
p.mu.Lock()
|
||||
for k, v := range p.files {
|
||||
// In order to ensure that there are no races with this (file held externally calls Ref
|
||||
// after we check InUse), we need to maintain the invariant that every handle to a file
|
||||
// is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
|
||||
// exactly once, and never use it again). InUse is only valid during a write lock, since
|
||||
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
|
||||
if !v.InUse() {
|
||||
if err := v.Close(); err != nil {
|
||||
p.logger.Info("Purge: close file", zap.Error(err))
|
||||
|
|
Loading…
Reference in New Issue