fix(tsm1): fix data race when accessing tombstone stats (#20773)

pull/20782/head
Daniel Moran 2021-02-18 20:23:57 -05:00 committed by GitHub
parent 084abc3040
commit efd766d60f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 85 additions and 120 deletions

View File

@ -13,6 +13,7 @@
1. [19936](https://github.com/influxdata/influxdb/pull/19936): Fix use-after-free bug in series ID iterator. Thanks @foobar!
1. [20585](https://github.com/influxdata/influxdb/pull/20585): Fix TSM WAL segement size check. Thanks @foobar!
1. [20754](https://github.com/influxdata/influxdb/pull/20754): Update references to docs site to use current URLs.
1. [20773](https://github.com/influxdata/influxdb/pull/20773): Fix data race in TSM engine when inspecting tombstone stats.
## v2.0.4 [2021-02-08]

View File

@ -929,7 +929,6 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
}
var tombstonePath string
f, err := os.Open(fullPath)
if err != nil {
return err
@ -940,9 +939,8 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}
// Grab the tombstone file if one exists.
if r.HasTombstones() {
tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path)
return intar.StreamFile(fi, shardRelativePath, tombstonePath, tw)
if ts := r.TombstoneStats(); ts.TombstoneExists {
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
}
min, max := r.TimeRange()

View File

@ -110,9 +110,9 @@ type TSMFile interface {
// HasTombstones returns true if file contains values that have been deleted.
HasTombstones() bool
// TombstoneFiles returns the tombstone filestats if there are any tombstones
// TombstoneStats returns the tombstone filestats if there are any tombstones
// written for this file.
TombstoneFiles() []FileStat
TombstoneStats() TombstoneStat
// Close closes the underlying file resources.
Close() error
@ -121,7 +121,7 @@ type TSMFile interface {
Size() uint32
// Rename renames the existing TSM file to a new name and replaces the mmap backing slice using the new
// file name. Index and Reader state are not re-initialized.
// file name. Index and Reader state are not re-initialized.
Rename(path string) error
// Remove deletes the file from the filesystem.
@ -205,6 +205,14 @@ type FileStat struct {
MinKey, MaxKey []byte
}
// TombstoneStat holds information about a possible tombstone file on disk.
type TombstoneStat struct {
TombstoneExists bool
Path string
LastModified int64
Size uint32
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (f FileStat) OverlapsTimeRange(min, max int64) bool {
return f.MinTime <= max && f.MaxTime >= min
@ -579,7 +587,7 @@ func (f *FileStore) Open() error {
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
for _, ts := range res.r.TombstoneFiles() {
if ts := res.r.TombstoneStats(); ts.TombstoneExists {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
}
@ -812,8 +820,8 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
return err
}
for _, t := range file.TombstoneFiles() {
if err := f.obs.FileUnlinking(t.Path); err != nil {
if ts := file.TombstoneStats(); ts.TombstoneExists {
if err := f.obs.FileUnlinking(ts.Path); err != nil {
return err
}
}
@ -831,8 +839,8 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
if file.InUse() {
// Copy all the tombstones related to this TSM file
var deletes []string
for _, t := range file.TombstoneFiles() {
deletes = append(deletes, t.Path)
if ts := file.TombstoneStats(); ts.TombstoneExists {
deletes = append(deletes, ts.Path)
}
// Rename the TSM file used by this reader
@ -894,10 +902,9 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
var totalSize int64
for _, file := range f.files {
totalSize += int64(file.Size())
for _, ts := range file.TombstoneFiles() {
if ts := file.TombstoneStats(); ts.TombstoneExists {
totalSize += int64(ts.Size)
}
}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)
@ -1084,9 +1091,9 @@ func (f *FileStore) CreateSnapshot() (string, error) {
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
for _, tf := range tsmf.TombstoneFiles() {
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tf.Path, newpath); err != nil {
if ts := tsmf.TombstoneStats(); ts.TombstoneExists {
newpath := filepath.Join(tmpPath, filepath.Base(ts.Path))
if err := os.Link(ts.Path, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}

View File

@ -168,7 +168,7 @@ func (*mockTSMFile) BatchDelete() BatchDeleter { panic("im
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }

View File

@ -2739,8 +2739,8 @@ func TestFileStore_CreateSnapshot(t *testing.T) {
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
for _, tf := range f.TombstoneFiles() {
p := filepath.Join(s, filepath.Base(tf.Path))
if ts := f.TombstoneStats(); ts.TombstoneExists {
p := filepath.Join(s, filepath.Base(ts.Path))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)

View File

@ -524,7 +524,7 @@ func (t *TSMReader) Size() uint32 {
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
for _, ts := range t.tombstoner.TombstoneFiles() {
if ts := t.tombstoner.TombstoneStats(); ts.TombstoneExists {
if ts.LastModified > lm {
lm = ts.LastModified
}
@ -542,9 +542,9 @@ func (t *TSMReader) HasTombstones() bool {
}
// TombstoneFiles returns any tombstone files associated with this TSM file.
func (t *TSMReader) TombstoneFiles() []FileStat {
func (t *TSMReader) TombstoneStats() TombstoneStat {
t.mu.RLock()
fs := t.tombstoner.TombstoneFiles()
fs := t.tombstoner.TombstoneStats()
t.mu.RUnlock()
return fs
}

View File

@ -8,6 +8,8 @@ import (
"path/filepath"
"sort"
"testing"
"github.com/stretchr/testify/require"
)
func fatal(t *testing.T, msg string, err error) {
@ -465,9 +467,7 @@ func TestTSMReader_MMAP_TombstoneOutsideTimeRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(r.TombstoneFiles()), 0; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)
}
require.False(t, r.TombstoneStats().TombstoneExists)
}
func TestTSMReader_MMAP_TombstoneOutsideKeyRange(t *testing.T) {
@ -529,10 +529,7 @@ func TestTSMReader_MMAP_TombstoneOutsideKeyRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(r.TombstoneFiles()), 0; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)
}
require.False(t, r.TombstoneStats().TombstoneExists)
}
func TestTSMReader_MMAP_TombstoneOverlapKeyRange(t *testing.T) {
@ -598,9 +595,7 @@ func TestTSMReader_MMAP_TombstoneOverlapKeyRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(r.TombstoneFiles()), 1; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)
}
require.True(t, r.TombstoneStats().TombstoneExists)
}
func TestTSMReader_MMAP_TombstoneFullRange(t *testing.T) {

View File

@ -38,15 +38,14 @@ type Tombstoner struct {
FilterFn func(k []byte) bool
// Tombstones that have been written but not flushed to disk yet.
tombstones []Tombstone
// cache of the stats for this tombstone
fileStats []FileStat
tombstoneStats TombstoneStat
// indicates that the stats may be out of sync with what is on disk and they
// should be refreshed.
statsLoaded bool
// Tombstones that have been written but not flushed to disk yet.
tombstones []Tombstone
// These are references used for pending writes that have not been committed. If
// these are nil, then no pending writes are in progress.
gz *gzip.Writer
@ -183,43 +182,51 @@ func (t *Tombstoner) Delete() error {
// HasTombstones return true if there are any tombstone entries recorded.
func (t *Tombstoner) HasTombstones() bool {
files := t.TombstoneFiles()
stats := t.TombstoneStats()
if !stats.TombstoneExists {
return false
}
if stats.Size > 0 {
return true
}
t.mu.RLock()
n := len(t.tombstones)
t.mu.RUnlock()
return len(files) > 0 && files[0].Size > 0 || n > 0
return n > 0
}
// TombstoneFiles returns any tombstone files associated with Tombstoner's TSM file.
func (t *Tombstoner) TombstoneFiles() []FileStat {
func (t *Tombstoner) TombstoneStats() TombstoneStat {
t.mu.RLock()
if t.statsLoaded {
stats := t.fileStats
stats := t.tombstoneStats
t.mu.RUnlock()
return stats
}
t.mu.RUnlock()
stat, err := os.Stat(t.tombstonePath())
if os.IsNotExist(err) || err != nil {
if err != nil {
t.mu.Lock()
// The file doesn't exist so record that we tried to load it so
// we don't continue to keep trying. This is the common case.
t.statsLoaded = os.IsNotExist(err)
t.fileStats = t.fileStats[:0]
t.tombstoneStats.TombstoneExists = false
stats := t.tombstoneStats
t.mu.Unlock()
return nil
return stats
}
t.mu.Lock()
t.fileStats = append(t.fileStats[:0], FileStat{
Path: t.tombstonePath(),
LastModified: stat.ModTime().UnixNano(),
Size: uint32(stat.Size()),
})
t.statsLoaded = true
stats := t.fileStats
t.tombstoneStats = TombstoneStat{
TombstoneExists: true,
Path: t.tombstonePath(),
LastModified: stat.ModTime().UnixNano(),
Size: uint32(stat.Size()),
}
stats := t.tombstoneStats
t.mu.Unlock()
return stats

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)
func TestTombstoner_Add(t *testing.T) {
@ -21,10 +22,8 @@ func TestTombstoner_Add(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneStats()
require.False(t, stats.TombstoneExists)
ts.Add([][]byte{[]byte("foo")})
@ -33,22 +32,11 @@ func TestTombstoner_Add(t *testing.T) {
}
entries = mustReadAll(ts)
stats = ts.TombstoneFiles()
if got, exp := len(stats), 1; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
if stats[0].Size == 0 {
t.Fatalf("got size %v, exp > 0", stats[0].Size)
}
if stats[0].LastModified == 0 {
t.Fatalf("got lastModified %v, exp > 0", stats[0].LastModified)
}
if stats[0].Path == "" {
t.Fatalf("got path %v, exp != ''", stats[0].Path)
}
stats = ts.TombstoneStats()
require.True(t, stats.TombstoneExists)
require.NotZero(t, stats.Size)
require.NotZero(t, stats.LastModified)
require.NotEmpty(t, stats.Path)
if got, exp := len(entries), 1; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
@ -82,10 +70,8 @@ func TestTombstoner_Add_LargeKey(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneStats()
require.False(t, stats.TombstoneExists)
key := bytes.Repeat([]byte{'a'}, 4096)
ts.Add([][]byte{key})
@ -95,22 +81,11 @@ func TestTombstoner_Add_LargeKey(t *testing.T) {
}
entries = mustReadAll(ts)
stats = ts.TombstoneFiles()
if got, exp := len(stats), 1; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
if stats[0].Size == 0 {
t.Fatalf("got size %v, exp > 0", stats[0].Size)
}
if stats[0].LastModified == 0 {
t.Fatalf("got lastModified %v, exp > 0", stats[0].LastModified)
}
if stats[0].Path == "" {
t.Fatalf("got path %v, exp != ''", stats[0].Path)
}
stats = ts.TombstoneStats()
require.True(t, stats.TombstoneExists)
require.NotZero(t, stats.Size)
require.NotZero(t, stats.LastModified)
require.NotEmpty(t, stats.Path)
if got, exp := len(entries), 1; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
@ -144,10 +119,8 @@ func TestTombstoner_Add_Multiple(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneStats()
require.False(t, stats.TombstoneExists)
ts.Add([][]byte{[]byte("foo")})
@ -162,22 +135,11 @@ func TestTombstoner_Add_Multiple(t *testing.T) {
}
entries = mustReadAll(ts)
stats = ts.TombstoneFiles()
if got, exp := len(stats), 1; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
if stats[0].Size == 0 {
t.Fatalf("got size %v, exp > 0", stats[0].Size)
}
if stats[0].LastModified == 0 {
t.Fatalf("got lastModified %v, exp > 0", stats[0].LastModified)
}
if stats[0].Path == "" {
t.Fatalf("got path %v, exp != ''", stats[0].Path)
}
stats = ts.TombstoneStats()
require.True(t, stats.TombstoneExists)
require.NotZero(t, stats.Size)
require.NotZero(t, stats.LastModified)
require.NotEmpty(t, stats.Path)
if got, exp := len(entries), 2; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
@ -233,11 +195,8 @@ func TestTombstoner_Add_Empty(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneStats()
require.False(t, stats.TombstoneExists)
}
func TestTombstoner_Delete(t *testing.T) {
@ -268,10 +227,8 @@ func TestTombstoner_Delete(t *testing.T) {
fatal(t, "delete tombstone", err)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneStats()
require.False(t, stats.TombstoneExists)
ts = tsm1.NewTombstoner(f.Name(), nil)
entries = mustReadAll(ts)