commit
049a9a859d
|
@ -97,15 +97,11 @@ type Engine struct {
|
||||||
|
|
||||||
// The following group of fields is used to track the state of level compactions within the
|
// The following group of fields is used to track the state of level compactions within the
|
||||||
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
|
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
|
||||||
// used to signal those goroutines to shutdown. Every request to disable level compactions will
|
// used to signal those goroutines to shutdown.
|
||||||
// call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the
|
|
||||||
// lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will
|
|
||||||
// decrease 'levelWorkers', and when it decreases to zero, level compactions will be started
|
|
||||||
// back up again.
|
|
||||||
|
|
||||||
wg sync.WaitGroup // waitgroup for active level compaction goroutines
|
wg sync.WaitGroup // waitgroup for active level compaction goroutines
|
||||||
done chan struct{} // channel to signal level compactions to stop
|
done chan struct{} // channel to signal level compactions to stop
|
||||||
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state
|
running int32 // running tracks whether compactions are being enabled or disabled
|
||||||
|
|
||||||
snapDone chan struct{} // channel to signal snapshot compactions to stop
|
snapDone chan struct{} // channel to signal snapshot compactions to stop
|
||||||
snapWG sync.WaitGroup // waitgroup for running snapshot compactions
|
snapWG sync.WaitGroup // waitgroup for running snapshot compactions
|
||||||
|
@ -211,72 +207,63 @@ func (e *Engine) SetEnabled(enabled bool) {
|
||||||
func (e *Engine) SetCompactionsEnabled(enabled bool) {
|
func (e *Engine) SetCompactionsEnabled(enabled bool) {
|
||||||
if enabled {
|
if enabled {
|
||||||
e.enableSnapshotCompactions()
|
e.enableSnapshotCompactions()
|
||||||
e.enableLevelCompactions(false)
|
e.enableLevelCompactions()
|
||||||
} else {
|
} else {
|
||||||
e.disableSnapshotCompactions()
|
e.disableSnapshotCompactions()
|
||||||
e.disableLevelCompactions(false)
|
e.disableLevelCompactions()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enableLevelCompactions will request that level compactions start back up again
|
// enableLevelCompactions will request that level compactions start back up again
|
||||||
//
|
func (e *Engine) enableLevelCompactions() {
|
||||||
// 'wait' signifies that a corresponding call to disableLevelCompactions(true) was made at some
|
// See if they are already enabled. If we set this to one, they are disabled
|
||||||
// point, and the associated task that required disabled compactions is now complete
|
// and we're the first to enable.
|
||||||
func (e *Engine) enableLevelCompactions(wait bool) {
|
if !atomic.CompareAndSwapInt32(&e.running, 0, 1) {
|
||||||
// If we don't need to wait, see if we're already enabled
|
|
||||||
if !wait {
|
|
||||||
e.mu.RLock()
|
|
||||||
if e.done != nil {
|
|
||||||
e.mu.RUnlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
e.mu.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
e.mu.Lock()
|
|
||||||
if wait {
|
|
||||||
e.levelWorkers -= 1
|
|
||||||
}
|
|
||||||
if e.levelWorkers != 0 || e.done != nil {
|
|
||||||
// still waiting on more workers or already enabled
|
|
||||||
e.mu.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// last one to enable, start things back up
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if e.done != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// First one to enable, start things up
|
||||||
e.Compactor.EnableCompactions()
|
e.Compactor.EnableCompactions()
|
||||||
quit := make(chan struct{})
|
quit := make(chan struct{})
|
||||||
e.done = quit
|
e.done = quit
|
||||||
|
|
||||||
e.wg.Add(1)
|
e.wg.Add(1)
|
||||||
e.mu.Unlock()
|
|
||||||
|
|
||||||
go func() { defer e.wg.Done(); e.compact(quit) }()
|
go func() { defer e.wg.Done(); e.compact(quit) }()
|
||||||
}
|
}
|
||||||
|
|
||||||
// disableLevelCompactions will stop level compactions before returning.
|
// disableLevelCompactions will stop level compactions before returning.
|
||||||
//
|
func (e *Engine) disableLevelCompactions() {
|
||||||
// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be
|
// Already disabled?
|
||||||
// required before level compactions will start back up again.
|
if atomic.LoadInt32(&e.running) == 0 {
|
||||||
func (e *Engine) disableLevelCompactions(wait bool) {
|
return
|
||||||
|
}
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
old := e.levelWorkers
|
if e.done == nil {
|
||||||
if wait {
|
e.mu.Unlock()
|
||||||
e.levelWorkers += 1
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if old == 0 && e.done != nil {
|
// Interrupt and disable running compactions
|
||||||
// Prevent new compactions from starting
|
e.Compactor.DisableCompactions()
|
||||||
e.Compactor.DisableCompactions()
|
|
||||||
|
|
||||||
// Stop all background compaction goroutines
|
// Stop all background compaction goroutines
|
||||||
close(e.done)
|
close(e.done)
|
||||||
e.done = nil
|
e.done = nil
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Allow goroutines to exit
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
e.wg.Wait()
|
e.wg.Wait()
|
||||||
|
|
||||||
|
atomic.StoreInt32(&e.running, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) enableSnapshotCompactions() {
|
func (e *Engine) enableSnapshotCompactions() {
|
||||||
|
@ -967,8 +954,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
||||||
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
|
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
|
||||||
// and writing tombstones takes a long time, writes can get rejected due to the cache
|
// and writing tombstones takes a long time, writes can get rejected due to the cache
|
||||||
// filling up.
|
// filling up.
|
||||||
e.disableLevelCompactions(true)
|
e.disableLevelCompactions()
|
||||||
defer e.enableLevelCompactions(true)
|
defer e.enableLevelCompactions()
|
||||||
|
|
||||||
tempKeys := seriesKeys[:]
|
tempKeys := seriesKeys[:]
|
||||||
deleteKeys := make([][]byte, 0, len(seriesKeys))
|
deleteKeys := make([][]byte, 0, len(seriesKeys))
|
||||||
|
|
|
@ -207,6 +207,10 @@ func (m *Measurement) CardinalityBytes(key []byte) int {
|
||||||
// AddSeries adds a series to the measurement's index.
|
// AddSeries adds a series to the measurement's index.
|
||||||
// It returns true if the series was added successfully or false if the series was already present.
|
// It returns true if the series was added successfully or false if the series was already present.
|
||||||
func (m *Measurement) AddSeries(s *Series) bool {
|
func (m *Measurement) AddSeries(s *Series) bool {
|
||||||
|
if s == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
if m.seriesByID[s.ID] != nil {
|
if m.seriesByID[s.ID] != nil {
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
@ -286,7 +290,9 @@ func (m *Measurement) Rebuild() *Measurement {
|
||||||
// expunged. Note: we're using SeriesIDs which returns the series in sorted order so that
|
// expunged. Note: we're using SeriesIDs which returns the series in sorted order so that
|
||||||
// re-adding does not incur a sort for each series added.
|
// re-adding does not incur a sort for each series added.
|
||||||
for _, id := range m.SeriesIDs() {
|
for _, id := range m.SeriesIDs() {
|
||||||
nm.AddSeries(m.SeriesByID(id))
|
if s := m.SeriesByID(id); s != nil {
|
||||||
|
nm.AddSeries(s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nm
|
return nm
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,13 @@ func TestSeriesIDs_Reject(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMeasurement_AddSeries_Nil(t *testing.T) {
|
||||||
|
m := inmem.NewMeasurement("foo", "cpu")
|
||||||
|
if m.AddSeries(nil) {
|
||||||
|
t.Fatalf("AddSeries mismatch: exp false, got true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
|
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
|
||||||
m := inmem.NewMeasurement("foo", "cpu")
|
m := inmem.NewMeasurement("foo", "cpu")
|
||||||
var dst []string
|
var dst []string
|
||||||
|
|
Loading…
Reference in New Issue