From 97f61e0ff4bfa794735b117e04a6cce069815484 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 18 Jan 2018 15:54:52 -0700 Subject: [PATCH] Allow SeriesFile compaction to be disabled --- tsdb/engine/tsm1/engine.go | 5 +++++ tsdb/series_file.go | 20 ++++++++++++++++++++ tsdb/series_partition.go | 25 +++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 92a61d314b..020dca3523 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1206,6 +1206,11 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro // filling up. e.disableLevelCompactions(true) defer e.enableLevelCompactions(true) + + e.sfile.DisableCompactions() + defer e.sfile.EnableCompactions() + e.sfile.Wait() + disableOnce = true } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 9ceb032c77..74005b94db 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -111,6 +111,26 @@ func (f *SeriesFile) Retain() func() { return nop } +// EnableCompactions allows compactions to run. +func (f *SeriesFile) EnableCompactions() { + for _, p := range f.partitions { + p.EnableCompactions() + } +} + +// DisableCompactions prevents new compactions from running. +func (f *SeriesFile) DisableCompactions() { + for _, p := range f.partitions { + p.DisableCompactions() + } +} + +// Wait waits for all Retains to be released. +func (f *SeriesFile) Wait() { + f.refs.Lock() + defer f.refs.Unlock() +} + // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. // The returned ids list returns values for new series and zero for existing series. func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) { diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index ed15de9544..a145ea3f1f 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -35,7 +35,8 @@ type SeriesPartition struct { index *SeriesIndex seq uint64 // series id sequence - compacting bool + compacting bool + compactionsDisabled int CompactThreshold int @@ -249,7 +250,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio } // Check if we've crossed the compaction threshold. - if !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) { + if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) { p.compacting = true logger := p.Logger.With(zap.String("path", p.path)) logger.Info("beginning series partition compaction") @@ -362,6 +363,26 @@ func (p *SeriesPartition) SeriesCount() uint64 { return n } +func (p *SeriesPartition) DisableCompactions() { + p.mu.Lock() + defer p.mu.Unlock() + p.compactionsDisabled++ +} + +func (p *SeriesPartition) EnableCompactions() { + p.mu.Lock() + defer p.mu.Unlock() + + if p.compactionsEnabled() { + return + } + p.compactionsDisabled++ +} + +func (p *SeriesPartition) compactionsEnabled() bool { + return p.compactionsDisabled == 0 +} + // AppendSeriesIDs returns a list of all series ids. func (p *SeriesPartition) AppendSeriesIDs(a []uint64) []uint64 { for _, segment := range p.segments {