Allow SeriesFile compaction to be disabled
parent
d755daede8
commit
97f61e0ff4
|
@ -1206,6 +1206,11 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
|
|||
// filling up.
|
||||
e.disableLevelCompactions(true)
|
||||
defer e.enableLevelCompactions(true)
|
||||
|
||||
e.sfile.DisableCompactions()
|
||||
defer e.sfile.EnableCompactions()
|
||||
e.sfile.Wait()
|
||||
|
||||
disableOnce = true
|
||||
}
|
||||
|
||||
|
|
|
@ -111,6 +111,26 @@ func (f *SeriesFile) Retain() func() {
|
|||
return nop
|
||||
}
|
||||
|
||||
// EnableCompactions allows compactions to run.
|
||||
func (f *SeriesFile) EnableCompactions() {
|
||||
for _, p := range f.partitions {
|
||||
p.EnableCompactions()
|
||||
}
|
||||
}
|
||||
|
||||
// DisableCompactions prevents new compactions from running.
|
||||
func (f *SeriesFile) DisableCompactions() {
|
||||
for _, p := range f.partitions {
|
||||
p.DisableCompactions()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait waits for all Retains to be released.
|
||||
func (f *SeriesFile) Wait() {
|
||||
f.refs.Lock()
|
||||
defer f.refs.Unlock()
|
||||
}
|
||||
|
||||
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
|
||||
// The returned ids list returns values for new series and zero for existing series.
|
||||
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) {
|
||||
|
|
|
@ -35,7 +35,8 @@ type SeriesPartition struct {
|
|||
index *SeriesIndex
|
||||
seq uint64 // series id sequence
|
||||
|
||||
compacting bool
|
||||
compacting bool
|
||||
compactionsDisabled int
|
||||
|
||||
CompactThreshold int
|
||||
|
||||
|
@ -249,7 +250,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
|
|||
}
|
||||
|
||||
// Check if we've crossed the compaction threshold.
|
||||
if !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
|
||||
if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
|
||||
p.compacting = true
|
||||
logger := p.Logger.With(zap.String("path", p.path))
|
||||
logger.Info("beginning series partition compaction")
|
||||
|
@ -362,6 +363,26 @@ func (p *SeriesPartition) SeriesCount() uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) DisableCompactions() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.compactionsDisabled++
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) EnableCompactions() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.compactionsEnabled() {
|
||||
return
|
||||
}
|
||||
p.compactionsDisabled++
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) compactionsEnabled() bool {
|
||||
return p.compactionsDisabled == 0
|
||||
}
|
||||
|
||||
// AppendSeriesIDs returns a list of all series ids.
|
||||
func (p *SeriesPartition) AppendSeriesIDs(a []uint64) []uint64 {
|
||||
for _, segment := range p.segments {
|
||||
|
|
Loading…
Reference in New Issue