fix(tsm1): make Digest() safe for concurrent use

This change adds a lock around digest creation so that it is safe for
concurrent calls. Prior to this change, calls from multiple goroutines
resulted in "Digest aborted, problem renaming tmp digest" errors.
pull/15878/head
David Norton 2019-11-12 18:02:41 -05:00
parent 904d3279cf
commit 102fcd671b
2 changed files with 57 additions and 0 deletions

View File

@ -196,6 +196,9 @@ type Engine struct {
// seriesTypeMap maps a series key to field type
seriesTypeMap *radix.Tree
// muDigest ensures only one goroutine can generate a digest at a time.
muDigest sync.RWMutex
}
// NewEngine returns a new instance of Engine.
@ -283,6 +286,9 @@ func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
e.muDigest.Lock()
defer e.muDigest.Unlock()
log, logEnd := logger.NewOperation(e.logger, "Engine digest", "tsm1_digest")
defer logEnd()

View File

@ -320,6 +320,57 @@ type span struct {
tspan *tsm1.DigestTimeSpan
}
// Ensure engine handles concurrent calls to Digest().
func TestEngine_Digest_Concurrent(t *testing.T) {
e := MustOpenEngine(inmem.IndexName)
defer e.Close()
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
// Create a few points.
points := []models.Point{
MustParsePointString("cpu,host=A value=1.1 1000000000"),
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}
if err := e.WritePoints(points); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Force a compaction.
e.ScheduleFullCompaction()
// Start multiple waiting goroutines, ready to call Digest().
start := make(chan struct{})
errs := make(chan error)
wg := &sync.WaitGroup{}
for n := 0; n < 100; n++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
if _, _, err := e.Digest(); err != nil {
errs <- err
}
}()
}
// Goroutine to close errs channel after all routines have finished.
go func() { wg.Wait(); close(errs) }()
// Signal all goroutines to call Digest().
close(start)
// Check for digest errors.
for err := range errs {
if err != nil {
t.Fatal(err)
}
}
}
// Ensure that the engine will backup any TSM files created since the passed in time
func TestEngine_Backup(t *testing.T) {
sfile := MustOpenSeriesFile()