TSM: Handle concurrent deletes for compaction

pull/7165/head
Joe LeGasse 2016-08-26 14:12:49 -04:00 committed by Jason Wilder
parent 80cc956fb6
commit eda8f70372
1 changed files with 61 additions and 69 deletions

View File

@ -58,13 +58,13 @@ const (
// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
done chan struct{}
snapshotterDone chan struct{}
wg sync.WaitGroup
snapshotterWg sync.WaitGroup
levelCompactionsEnabled bool
snapshotCompactionsEnabled bool
mu sync.RWMutex
wg sync.WaitGroup
done chan struct{}
levelWorkers int
snapDone chan struct{}
snapWG sync.WaitGroup
path string
logger *log.Logger // Logger to be used for important messages
@ -155,80 +155,87 @@ func (e *Engine) SetEnabled(enabled bool) {
func (e *Engine) SetCompactionsEnabled(enabled bool) {
if enabled {
e.enableSnapshotCompactions()
e.enableLevelCompactions()
e.enableLevelCompactions(0)
} else {
e.disableSnapshotCompactions()
e.disableLevelCompactions()
e.disableLevelCompactions(0)
}
}
func (e *Engine) enableLevelCompactions() {
func (e *Engine) enableLevelCompactions(n int) {
e.mu.Lock()
if e.levelCompactionsEnabled {
e.levelWorkers -= n
if e.levelWorkers != 0 || e.done != nil {
// still waiting on more workers or already enabled
e.mu.Unlock()
return
}
e.levelCompactionsEnabled = true
// last one to enable, start things back up
e.Compactor.EnableCompactions()
e.done = make(chan struct{})
e.mu.Unlock()
quit := make(chan struct{})
e.done = quit
e.wg.Add(4)
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
}
func (e *Engine) disableLevelCompactions() {
e.mu.Lock()
if !e.levelCompactionsEnabled {
e.mu.Unlock()
return
}
// Prevent new compactions from starting
e.levelCompactionsEnabled = false
e.Compactor.DisableCompactions()
e.mu.Unlock()
// Stop all background compaction goroutines
close(e.done)
go func() { defer e.wg.Done(); e.compactTSMFull(quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 1, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 2, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(false, 3, quit) }()
}
// Wait for compaction goroutines to exit
func (e *Engine) disableLevelCompactions(n int) {
e.mu.Lock()
old := e.levelWorkers
e.levelWorkers += n
if old == 0 && e.done != nil {
// Prevent new compactions from starting
e.Compactor.DisableCompactions()
// Stop all background compaction goroutines
close(e.done)
e.done = nil
}
e.mu.Unlock()
e.wg.Wait()
if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
if old == 0 { // first to disable should cleanup
if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
}
}
}
func (e *Engine) enableSnapshotCompactions() {
e.mu.Lock()
if e.snapshotCompactionsEnabled {
if e.snapDone != nil {
e.mu.Unlock()
return
}
e.snapshotCompactionsEnabled = true
e.snapshotterDone = make(chan struct{})
e.Compactor.EnableSnapshots()
quit := make(chan struct{})
e.snapDone = quit
e.snapWG.Add(1)
e.mu.Unlock()
e.snapshotterWg.Add(1)
go e.compactCache()
go func() { defer e.snapWG.Done(); e.compactCache(quit) }()
}
func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()
if !e.snapshotCompactionsEnabled {
e.mu.Unlock()
return
if e.snapDone != nil {
e.Compactor.DisableSnapshots()
close(e.snapDone)
e.snapDone = nil
}
e.snapshotCompactionsEnabled = false
e.Compactor.DisableSnapshots()
e.mu.Unlock()
e.snapshotterWg.Wait()
e.snapWG.Wait()
}
// Path returns the path the engine was opened with.
@ -309,8 +316,6 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
// Open opens and initializes the engine.
func (e *Engine) Open() error {
e.done = make(chan struct{})
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
}
@ -688,8 +693,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up.
e.disableLevelCompactions()
defer e.enableLevelCompactions()
e.disableLevelCompactions(1)
defer e.enableLevelCompactions(1)
// keyMap is used to see if a given key should be deleted. seriesKey
// are the measurement + tagset (minus separate & field)
@ -860,22 +865,13 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
}
// compactCache continually checks if the WAL cache should be written to disk
func (e *Engine) compactCache() {
defer e.snapshotterWg.Done()
func (e *Engine) compactCache(quit <-chan struct{}) {
for {
select {
case <-e.snapshotterDone:
case <-quit:
return
default:
e.mu.RLock()
enabled := e.snapshotCompactionsEnabled
e.mu.RUnlock()
if !enabled {
return
}
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
@ -907,12 +903,10 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration
}
func (e *Engine) compactTSMLevel(fast bool, level int) {
defer e.wg.Done()
func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
for {
select {
case <-e.done:
case <-quit:
return
default:
@ -997,12 +991,10 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
}
}
func (e *Engine) compactTSMFull() {
defer e.wg.Done()
func (e *Engine) compactTSMFull(quit <-chan struct{}) {
for {
select {
case <-e.done:
case <-quit:
return
default: