fix(tsi1/partition/test): fix data race in test code (#25298)

* feat: add hook for optimizing series reads based on authorizer (#25207)

* fix(edge): Backporting #24613 in to 1.11

* fix(test): fixes typo from merge in test

* fix(testing): adds error checking to cleanups

---------

Co-authored-by: Geoffrey Wossum <gwossum@influxdata.com>
pull/25307/head^2
WeblWabl 2024-09-10 12:07:13 -05:00 committed by GitHub
parent 1e710f83b8
commit ee51e5c54c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 83 additions and 62 deletions

View File

@ -177,7 +177,8 @@ testcase bare_last {
) )
result = csv.from(csv: input) result = csv.from(csv: input)
|> testing.load() |> testing.load()
|> range(start: -100y) |> range(start: 2015-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "pge_bill" and r._field == "bank")
|> last() |> last()
|> keep(columns: ["_time", "_value", "_field", "_measurement"]) |> keep(columns: ["_time", "_value", "_field", "_measurement"])

View File

@ -39,7 +39,8 @@ const ManifestFileName = "MANIFEST"
// Partition represents a collection of layered index files and WAL. // Partition represents a collection of layered index files and WAL.
type Partition struct { type Partition struct {
mu sync.RWMutex // exported for tests
Mu sync.RWMutex
opened bool opened bool
sfile *tsdb.SeriesFile // series lookup file sfile *tsdb.SeriesFile // series lookup file
@ -150,8 +151,8 @@ var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")
// Open opens the partition. // Open opens the partition.
func (p *Partition) Open() (rErr error) { func (p *Partition) Open() (rErr error) {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
p.closing = make(chan struct{}) p.closing = make(chan struct{})
@ -339,8 +340,8 @@ func (p *Partition) buildSeriesSet() error {
// CurrentCompactionN returns the number of compactions currently running. // CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int { func (p *Partition) CurrentCompactionN() int {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return p.currentCompactionN return p.currentCompactionN
} }
@ -367,8 +368,8 @@ func (p *Partition) Close() error {
p.Wait() p.Wait()
// Lock index and close remaining // Lock index and close remaining
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
var err error var err error
@ -402,8 +403,8 @@ func (p *Partition) SeriesFile() *tsdb.SeriesFile { return p.sfile }
// NextSequence returns the next file identifier. // NextSequence returns the next file identifier.
func (p *Partition) NextSequence() int { func (p *Partition) NextSequence() int {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
return p.nextSequence() return p.nextSequence()
} }
@ -445,8 +446,8 @@ func (p *Partition) manifest(newFileSet *FileSet) *Manifest {
// SetManifestPathForTest is only to force a bad path in testing // SetManifestPathForTest is only to force a bad path in testing
func (p *Partition) SetManifestPathForTest(path string) { func (p *Partition) SetManifestPathForTest(path string) {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
p.manifestPathFn = func() string { return path } p.manifestPathFn = func() string { return path }
} }
@ -457,16 +458,16 @@ func (p *Partition) WithLogger(logger *zap.Logger) {
// SetFieldSet sets a shared field set from the engine. // SetFieldSet sets a shared field set from the engine.
func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) { func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
p.mu.Lock() p.Mu.Lock()
p.fieldset = fs p.fieldset = fs
p.mu.Unlock() p.Mu.Unlock()
} }
// FieldSet returns the fieldset. // FieldSet returns the fieldset.
func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet { func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet {
p.mu.Lock() p.Mu.Lock()
fs := p.fieldset fs := p.fieldset
p.mu.Unlock() p.Mu.Unlock()
return fs return fs
} }
@ -476,8 +477,8 @@ func (p *Partition) RetainFileSet() (*FileSet, error) {
case <-p.closing: case <-p.closing:
return nil, tsdb.ErrIndexClosing return nil, tsdb.ErrIndexClosing
default: default:
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return p.retainFileSet(), nil return p.retainFileSet(), nil
} }
} }
@ -490,8 +491,8 @@ func (p *Partition) retainFileSet() *FileSet {
// FileN returns the active files in the file set. // FileN returns the active files in the file set.
func (p *Partition) FileN() int { func (p *Partition) FileN() int {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return len(p.fileSet.files) return len(p.fileSet.files)
} }
@ -671,12 +672,12 @@ func (p *Partition) DropMeasurement(name []byte) error {
// Mark measurement as deleted. // Mark measurement as deleted.
entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}) entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name})
p.mu.RLock() p.Mu.RLock()
if err := p.activeLogFile.Writes(entries); err != nil { if err := p.activeLogFile.Writes(entries); err != nil {
p.mu.RUnlock() p.Mu.RUnlock()
return err return err
} }
p.mu.RUnlock() p.Mu.RUnlock()
// Check if the log file needs to be swapped. // Check if the log file needs to be swapped.
if err := p.CheckLogFile(); err != nil { if err := p.CheckLogFile(); err != nil {
@ -704,14 +705,14 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
defer fs.Release() defer fs.Release()
// Ensure fileset cannot change during insert. // Ensure fileset cannot change during insert.
p.mu.RLock() p.Mu.RLock()
// Insert series into log file. // Insert series into log file.
ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice, tracker) ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice, tracker)
if err != nil { if err != nil {
p.mu.RUnlock() p.Mu.RUnlock()
return nil, err return nil, err
} }
p.mu.RUnlock() p.Mu.RUnlock()
if err := p.CheckLogFile(); err != nil { if err := p.CheckLogFile(); err != nil {
return nil, err return nil, err
@ -722,8 +723,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
func (p *Partition) DropSeries(seriesID uint64) error { func (p *Partition) DropSeries(seriesID uint64) error {
// Delete series from index. // Delete series from index.
if err := func() error { if err := func() error {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(seriesID) return p.activeLogFile.DeleteSeriesID(seriesID)
}(); err != nil { }(); err != nil {
return err return err
@ -742,8 +743,8 @@ func (p *Partition) DropSeriesList(seriesIDs []uint64) error {
// Delete series from index. // Delete series from index.
if err := func() error { if err := func() error {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return p.activeLogFile.DeleteSeriesIDList(seriesIDs) return p.activeLogFile.DeleteSeriesIDList(seriesIDs)
}(); err != nil { }(); err != nil {
return err return err
@ -910,14 +911,14 @@ func (p *Partition) AssignShard(k string, shardID uint64) {}
// Compact requests a compaction of log files. // Compact requests a compaction of log files.
func (p *Partition) Compact() { func (p *Partition) Compact() {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
p.compact() p.compact()
} }
func (p *Partition) DisableCompactions() { func (p *Partition) DisableCompactions() {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
p.compactionsDisabled++ p.compactionsDisabled++
select { select {
@ -933,8 +934,8 @@ func (p *Partition) DisableCompactions() {
} }
func (p *Partition) EnableCompactions() { func (p *Partition) EnableCompactions() {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
// Already enabled? // Already enabled?
if p.compactionsEnabled() { if p.compactionsEnabled() {
@ -952,9 +953,9 @@ func (p *Partition) runPeriodicCompaction() {
p.Compact() p.Compact()
// Avoid a race when using Reopen in tests // Avoid a race when using Reopen in tests
p.mu.RLock() p.Mu.RLock()
closing := p.closing closing := p.closing
p.mu.RUnlock() p.Mu.RUnlock()
// check for compactions once an hour (usually not necessary but a nice safety check) // check for compactions once an hour (usually not necessary but a nice safety check)
t := time.NewTicker(1 * time.Hour) t := time.NewTicker(1 * time.Hour)
@ -977,8 +978,8 @@ func (p *Partition) runPeriodicCompaction() {
// If checkRunning = true, only count as needing a compaction if there is not a compaction already // If checkRunning = true, only count as needing a compaction if there is not a compaction already
// in progress for the level that would be compacted // in progress for the level that would be compacted
func (p *Partition) NeedsCompaction(checkRunning bool) bool { func (p *Partition) NeedsCompaction(checkRunning bool) bool {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
if p.needsLogCompaction() { if p.needsLogCompaction() {
return true return true
} }
@ -1031,10 +1032,10 @@ func (p *Partition) compact() {
p.currentCompactionN++ p.currentCompactionN++
go func() { go func() {
p.compactLogFile(logFile) p.compactLogFile(logFile)
p.mu.Lock() p.Mu.Lock()
p.currentCompactionN-- p.currentCompactionN--
p.levelCompacting[0] = false p.levelCompacting[0] = false
p.mu.Unlock() p.Mu.Unlock()
p.Compact() p.Compact()
}() }()
} }
@ -1074,10 +1075,10 @@ func (p *Partition) compact() {
p.compactToLevel(files, level+1, interrupt) p.compactToLevel(files, level+1, interrupt)
// Ensure compaction lock for the level is released. // Ensure compaction lock for the level is released.
p.mu.Lock() p.Mu.Lock()
p.levelCompacting[level] = false p.levelCompacting[level] = false
p.currentCompactionN-- p.currentCompactionN--
p.mu.Unlock() p.Mu.Unlock()
// Check for new compactions // Check for new compactions
p.Compact() p.Compact()
@ -1155,8 +1156,8 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
// Obtain lock to swap in index file and write manifest. // Obtain lock to swap in index file and write manifest.
if err := func() (rErr error) { if err := func() (rErr error) {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
// Replace previous files with new index file. // Replace previous files with new index file.
newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file) newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file)
@ -1220,8 +1221,8 @@ func (p *Partition) needsLogCompaction() bool {
func (p *Partition) CheckLogFile() error { func (p *Partition) CheckLogFile() error {
// Check log file under read lock. // Check log file under read lock.
needsCompaction := func() bool { needsCompaction := func() bool {
p.mu.RLock() p.Mu.RLock()
defer p.mu.RUnlock() defer p.Mu.RUnlock()
return p.needsLogCompaction() return p.needsLogCompaction()
}() }()
if !needsCompaction { if !needsCompaction {
@ -1229,8 +1230,8 @@ func (p *Partition) CheckLogFile() error {
} }
// If file size exceeded then recheck under write lock and swap files. // If file size exceeded then recheck under write lock and swap files.
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
return p.checkLogFile() return p.checkLogFile()
} }
@ -1260,9 +1261,9 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
return return
} }
p.mu.Lock() p.Mu.Lock()
interrupt := p.compactionInterrupt interrupt := p.compactionInterrupt
p.mu.Unlock() p.Mu.Unlock()
start := time.Now() start := time.Now()
@ -1312,8 +1313,8 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
// Obtain lock to swap in index file and write manifest. // Obtain lock to swap in index file and write manifest.
if err := func() (rErr error) { if err := func() (rErr error) {
p.mu.Lock() p.Mu.Lock()
defer p.mu.Unlock() defer p.Mu.Unlock()
// Replace previous log file with index file. // Replace previous log file with index file.
newFileSet := p.fileSet.MustReplace([]File{logFile}, file) newFileSet := p.fileSet.MustReplace([]File{logFile}, file)

View File

@ -74,9 +74,19 @@ func TestPartition_Open(t *testing.T) {
func TestPartition_Manifest(t *testing.T) { func TestPartition_Manifest(t *testing.T) {
t.Run("current MANIFEST", func(t *testing.T) { t.Run("current MANIFEST", func(t *testing.T) {
sfile := MustOpenSeriesFile() sfile := MustOpenSeriesFile()
defer sfile.Close() t.Cleanup(func() {
if err := sfile.Close(); err != nil {
t.Fatalf("error closing series file %v", err)
}
})
p := MustOpenPartition(sfile.SeriesFile) p := MustOpenPartition(sfile.SeriesFile)
t.Cleanup(func() {
if err := p.Close(); err != nil {
t.Fatalf("error closing partition %v", err)
}
})
if got, exp := p.Manifest().Version, tsi1.Version; got != exp { if got, exp := p.Manifest().Version, tsi1.Version; got != exp {
t.Fatalf("got MANIFEST version %d, expected %d", got, exp) t.Fatalf("got MANIFEST version %d, expected %d", got, exp)
} }
@ -98,14 +108,17 @@ func TestPartition_Manifest_Write_Fail(t *testing.T) {
func TestPartition_PrependLogFile_Write_Fail(t *testing.T) { func TestPartition_PrependLogFile_Write_Fail(t *testing.T) {
t.Run("write MANIFEST", func(t *testing.T) { t.Run("write MANIFEST", func(t *testing.T) {
sfile := MustOpenSeriesFile() sfile := MustOpenSeriesFile()
defer sfile.Close() t.Cleanup(func() {
if err := sfile.Close(); err != nil {
t.Fatalf("error closing series file %v", err)
}
})
p := MustOpenPartition(sfile.SeriesFile) p := MustOpenPartition(sfile.SeriesFile)
defer func() { t.Cleanup(func() {
if err := p.Close(); err != nil { if err := p.Close(); err != nil {
t.Fatalf("error closing partition: %v", err) t.Fatalf("error closing partition: %v", err)
} }
}() })
p.Partition.MaxLogFileSize = -1 p.Partition.MaxLogFileSize = -1
fileN := p.FileN() fileN := p.FileN()
p.CheckLogFile() p.CheckLogFile()
@ -124,15 +137,21 @@ func TestPartition_PrependLogFile_Write_Fail(t *testing.T) {
func TestPartition_Compact_Write_Fail(t *testing.T) { func TestPartition_Compact_Write_Fail(t *testing.T) {
t.Run("write MANIFEST", func(t *testing.T) { t.Run("write MANIFEST", func(t *testing.T) {
sfile := MustOpenSeriesFile() sfile := MustOpenSeriesFile()
defer sfile.Close() t.Cleanup(func() {
if err := sfile.Close(); err != nil {
t.Fatalf("error closing series file %v", err)
}
})
p := MustOpenPartition(sfile.SeriesFile) p := MustOpenPartition(sfile.SeriesFile)
defer func() { t.Cleanup(func() {
if err := p.Close(); err != nil { if err := p.Close(); err != nil {
t.Fatalf("error closing partition: %v", err) t.Fatalf("error closing partition: %v", err)
} }
}() })
p.Partition.Mu.Lock()
p.Partition.MaxLogFileSize = -1 p.Partition.MaxLogFileSize = -1
p.Partition.Mu.Unlock()
fileN := p.FileN() fileN := p.FileN()
p.Compact() p.Compact()
if (1 + fileN) != p.FileN() { if (1 + fileN) != p.FileN() {