From 5033783a3319aa5e10afb53f7949accf617db6af Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 10 Oct 2017 12:01:52 -0600 Subject: [PATCH 1/2] Handle deleted series when rebuilding measurment index --- tsdb/index/inmem/meta.go | 8 +++++++- tsdb/index/inmem/meta_test.go | 7 +++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 1964f2fd39..09e847c18a 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -207,6 +207,10 @@ func (m *Measurement) CardinalityBytes(key []byte) int { // AddSeries adds a series to the measurement's index. // It returns true if the series was added successfully or false if the series was already present. func (m *Measurement) AddSeries(s *Series) bool { + if s == nil { + return false + } + m.mu.RLock() if m.seriesByID[s.ID] != nil { m.mu.RUnlock() @@ -286,7 +290,9 @@ func (m *Measurement) Rebuild() *Measurement { // expunged. Note: we're using SeriesIDs which returns the series in sorted order so that // re-adding does not incur a sort for each series added. for _, id := range m.SeriesIDs() { - nm.AddSeries(m.SeriesByID(id)) + if s := m.SeriesByID(id); s != nil { + nm.AddSeries(s) + } } return nm } diff --git a/tsdb/index/inmem/meta_test.go b/tsdb/index/inmem/meta_test.go index e691619e67..e265bdea30 100644 --- a/tsdb/index/inmem/meta_test.go +++ b/tsdb/index/inmem/meta_test.go @@ -93,6 +93,13 @@ func TestSeriesIDs_Reject(t *testing.T) { } } +func TestMeasurement_AddSeries_Nil(t *testing.T) { + m := inmem.NewMeasurement("foo", "cpu") + if m.AddSeries(nil) { + t.Fatalf("AddSeries mismatch: exp false, got true") + } +} + func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { m := inmem.NewMeasurement("foo", "cpu") var dst []string From 4f8580fbaaf2eb149ecaaf0c2e1a2759a0edac1f Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 10 Oct 2017 14:36:21 -0600 Subject: [PATCH 2/2] Fix race in disableLevelCompactions There was a race on the WaitGroup where we could end up calling Add while another goroutine was still waiting. The functions were confusing so they have been simplified a bit since the compactions goroutines have been reworked a lot already. --- tsdb/engine/tsm1/engine.go | 87 ++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 50 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 46d3e7f971..6e6ecc1fed 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -97,15 +97,11 @@ type Engine struct { // The following group of fields is used to track the state of level compactions within the // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is - // used to signal those goroutines to shutdown. Every request to disable level compactions will - // call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the - // lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will - // decrease 'levelWorkers', and when it decreases to zero, level compactions will be started - // back up again. + // used to signal those goroutines to shutdown. - wg sync.WaitGroup // waitgroup for active level compaction goroutines - done chan struct{} // channel to signal level compactions to stop - levelWorkers int // Number of "workers" that expect compactions to be in a disabled state + wg sync.WaitGroup // waitgroup for active level compaction goroutines + done chan struct{} // channel to signal level compactions to stop + running int32 // running tracks whether compactions are being enabled or disabled snapDone chan struct{} // channel to signal snapshot compactions to stop snapWG sync.WaitGroup // waitgroup for running snapshot compactions @@ -211,72 +207,63 @@ func (e *Engine) SetEnabled(enabled bool) { func (e *Engine) SetCompactionsEnabled(enabled bool) { if enabled { e.enableSnapshotCompactions() - e.enableLevelCompactions(false) + e.enableLevelCompactions() } else { e.disableSnapshotCompactions() - e.disableLevelCompactions(false) + e.disableLevelCompactions() } } // enableLevelCompactions will request that level compactions start back up again -// -// 'wait' signifies that a corresponding call to disableLevelCompactions(true) was made at some -// point, and the associated task that required disabled compactions is now complete -func (e *Engine) enableLevelCompactions(wait bool) { - // If we don't need to wait, see if we're already enabled - if !wait { - e.mu.RLock() - if e.done != nil { - e.mu.RUnlock() - return - } - e.mu.RUnlock() - } - - e.mu.Lock() - if wait { - e.levelWorkers -= 1 - } - if e.levelWorkers != 0 || e.done != nil { - // still waiting on more workers or already enabled - e.mu.Unlock() +func (e *Engine) enableLevelCompactions() { + // See if they are already enabled. If we set this to one, they are disabled + // and we're the first to enable. + if !atomic.CompareAndSwapInt32(&e.running, 0, 1) { return } - // last one to enable, start things back up + e.mu.Lock() + defer e.mu.Unlock() + + if e.done != nil { + return + } + + // First one to enable, start things up e.Compactor.EnableCompactions() quit := make(chan struct{}) e.done = quit e.wg.Add(1) - e.mu.Unlock() go func() { defer e.wg.Done(); e.compact(quit) }() } // disableLevelCompactions will stop level compactions before returning. -// -// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be -// required before level compactions will start back up again. -func (e *Engine) disableLevelCompactions(wait bool) { +func (e *Engine) disableLevelCompactions() { + // Already disabled? + if atomic.LoadInt32(&e.running) == 0 { + return + } + e.mu.Lock() - old := e.levelWorkers - if wait { - e.levelWorkers += 1 + if e.done == nil { + e.mu.Unlock() + return } - if old == 0 && e.done != nil { - // Prevent new compactions from starting - e.Compactor.DisableCompactions() + // Interrupt and disable running compactions + e.Compactor.DisableCompactions() - // Stop all background compaction goroutines - close(e.done) - e.done = nil - - } + // Stop all background compaction goroutines + close(e.done) + e.done = nil + // Allow goroutines to exit e.mu.Unlock() e.wg.Wait() + + atomic.StoreInt32(&e.running, 0) } func (e *Engine) enableSnapshotCompactions() { @@ -967,8 +954,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // so that snapshotting does not stop while writing out tombstones. If it is stopped, // and writing tombstones takes a long time, writes can get rejected due to the cache // filling up. - e.disableLevelCompactions(true) - defer e.enableLevelCompactions(true) + e.disableLevelCompactions() + defer e.enableLevelCompactions() tempKeys := seriesKeys[:] deleteKeys := make([][]byte, 0, len(seriesKeys))