Merge pull request #15318 from influxdata/er-mv-comp-limiter

feat(storage): allow compaction limiter to be injected into engine
pull/15365/head
Edd Robinson 2019-10-09 13:11:44 +01:00 committed by GitHub
commit ef1e15a0ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 2 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/storage/wal"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/tsi1"
@ -131,6 +132,15 @@ func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option {
}
}
// WithCompactionLimiter allows the caller to set the limiter that a storage
// engine uses. A typical use-case for this would be if multiple engines should
// share the same limiter.
func WithCompactionLimiter(limiter limiter.Fixed) Option {
return func(e *Engine) {
e.engine.WithCompactionLimiter(limiter)
}
}
// NewEngine initialises a new storage engine, including a series file, index and
// TSM engine.
func NewEngine(path string, c Config, options ...Option) *Engine {
@ -155,8 +165,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
e.wal.SetEnabled(c.WAL.Enabled)
// Initialise Engine
e.engine = tsm1.NewEngine(c.GetEnginePath(path), e.index, c.Engine,
tsm1.WithSnapshotter(e))
e.engine = tsm1.NewEngine(c.GetEnginePath(path), e.index, c.Engine, tsm1.WithSnapshotter(e))
// Apply options.
for _, option := range options {

View File

@ -231,6 +231,12 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
return e
}
// WithCompactionLimiter sets the compaction limiter, which is used to limit the
// number of concurrent compactions.
func (e *Engine) WithCompactionLimiter(limiter limiter.Fixed) {
e.compactionLimiter = limiter
}
func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
e.Compactor.WithFormatFileNameFunc(formatFileNameFunc)
e.formatFileName = formatFileNameFunc