Merge pull request #15878 from influxdata/dn-fix-digest-race
fix(tsm1): make digest safe for concurrent usepull/15951/head
commit
edde221f55
|
@ -196,6 +196,9 @@ type Engine struct {
|
||||||
|
|
||||||
// seriesTypeMap maps a series key to field type
|
// seriesTypeMap maps a series key to field type
|
||||||
seriesTypeMap *radix.Tree
|
seriesTypeMap *radix.Tree
|
||||||
|
|
||||||
|
// muDigest ensures only one goroutine can generate a digest at a time.
|
||||||
|
muDigest sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine returns a new instance of Engine.
|
// NewEngine returns a new instance of Engine.
|
||||||
|
@ -283,6 +286,9 @@ func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
|
||||||
|
|
||||||
// Digest returns a reader for the shard's digest.
|
// Digest returns a reader for the shard's digest.
|
||||||
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
|
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
|
||||||
|
e.muDigest.Lock()
|
||||||
|
defer e.muDigest.Unlock()
|
||||||
|
|
||||||
log, logEnd := logger.NewOperation(e.logger, "Engine digest", "tsm1_digest")
|
log, logEnd := logger.NewOperation(e.logger, "Engine digest", "tsm1_digest")
|
||||||
defer logEnd()
|
defer logEnd()
|
||||||
|
|
||||||
|
|
|
@ -320,6 +320,57 @@ type span struct {
|
||||||
tspan *tsm1.DigestTimeSpan
|
tspan *tsm1.DigestTimeSpan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure engine handles concurrent calls to Digest().
|
||||||
|
func TestEngine_Digest_Concurrent(t *testing.T) {
|
||||||
|
e := MustOpenEngine(inmem.IndexName)
|
||||||
|
defer e.Close()
|
||||||
|
|
||||||
|
if err := e.Open(); err != nil {
|
||||||
|
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a few points.
|
||||||
|
points := []models.Point{
|
||||||
|
MustParsePointString("cpu,host=A value=1.1 1000000000"),
|
||||||
|
MustParsePointString("cpu,host=B value=1.2 2000000000"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.WritePoints(points); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force a compaction.
|
||||||
|
e.ScheduleFullCompaction()
|
||||||
|
|
||||||
|
// Start multiple waiting goroutines, ready to call Digest().
|
||||||
|
start := make(chan struct{})
|
||||||
|
errs := make(chan error)
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
for n := 0; n < 100; n++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
if _, _, err := e.Digest(); err != nil {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Goroutine to close errs channel after all routines have finished.
|
||||||
|
go func() { wg.Wait(); close(errs) }()
|
||||||
|
|
||||||
|
// Signal all goroutines to call Digest().
|
||||||
|
close(start)
|
||||||
|
|
||||||
|
// Check for digest errors.
|
||||||
|
for err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the engine will backup any TSM files created since the passed in time
|
// Ensure that the engine will backup any TSM files created since the passed in time
|
||||||
func TestEngine_Backup(t *testing.T) {
|
func TestEngine_Backup(t *testing.T) {
|
||||||
sfile := MustOpenSeriesFile()
|
sfile := MustOpenSeriesFile()
|
||||||
|
|
Loading…
Reference in New Issue