feat(storage): Limit concurrent series partition compaction (#14240)

* feat(storage): Limit concurrent series partition snapshots

* feat: make concurrency configurable

* fix: integrate review feedback

* refactor: rename config value
pull/14272/head
Jacob Marble 2019-07-30 10:34:06 -07:00 committed by GitHub
parent e6b14b93da
commit b731cc5e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 24 deletions

View File

@ -69,6 +69,11 @@ const (
// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache in the TSI index.
DefaultSeriesIDSetCacheSize = 100
// DefaultSeriesFileMaxConcurrentSnapshotCompactions is the maximum number of concurrent series
// partition snapshot compactions that can run at one time.
// A value of 0 results in runtime.GOMAXPROCS(0).
DefaultSeriesFileMaxConcurrentSnapshotCompactions = 0
)
// Config holds the configuration for the tsbd package.
@ -128,6 +133,13 @@ type Config struct {
// Setting series-id-set-cache-size to 0 disables the cache.
SeriesIDSetCacheSize int `toml:"series-id-set-cache-size"`
// SeriesFileMaxConcurrentSnapshotCompactions is the maximum number of concurrent snapshot compactions
// that can be running at one time across all series partitions in a database. Snapshots scheduled
// to run when the limit is reached are blocked until a running snaphsot completes. Only snapshot
// compactions are affected by this limit. A value of 0 limits snapshot compactions to the lesser of
// 8 (series file partition quantity) and runtime.GOMAXPROCS(0).
SeriesFileMaxConcurrentSnapshotCompactions int `toml:"series-file-max-concurrent-snapshot-compactions"`
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
// TSMWillNeed controls whether we hint to the kernel that we intend to
@ -159,6 +171,8 @@ func NewConfig() Config {
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize,
SeriesFileMaxConcurrentSnapshotCompactions: DefaultSeriesFileMaxConcurrentSnapshotCompactions,
TraceLoggingEnabled: false,
TSMWillNeed: false,
}
@ -180,6 +194,10 @@ func (c *Config) Validate() error {
return errors.New("series-id-set-cache-size must be non-negative")
}
if c.SeriesFileMaxConcurrentSnapshotCompactions < 0 {
return errors.New("series-file-max-concurrent-compactions must be non-negative")
}
valid := false
for _, e := range RegisteredEngines() {
if e == c.Engine {
@ -208,17 +226,18 @@ func (c *Config) Validate() error {
// Diagnostics returns a diagnostics representation of a subset of the Config.
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
return diagnostics.RowFromMap(map[string]interface{}{
"dir": c.Dir,
"wal-dir": c.WALDir,
"wal-fsync-delay": c.WALFsyncDelay,
"cache-max-memory-size": c.CacheMaxMemorySize,
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-values-per-tag": c.MaxValuesPerTag,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
"max-index-log-file-size": c.MaxIndexLogFileSize,
"series-id-set-cache-size": c.SeriesIDSetCacheSize,
"dir": c.Dir,
"wal-dir": c.WALDir,
"wal-fsync-delay": c.WALFsyncDelay,
"cache-max-memory-size": c.CacheMaxMemorySize,
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-values-per-tag": c.MaxValuesPerTag,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
"max-index-log-file-size": c.MaxIndexLogFileSize,
"series-id-set-cache-size": c.SeriesIDSetCacheSize,
"series-file-max-concurrent-compactions": c.SeriesFileMaxConcurrentSnapshotCompactions,
}), nil
}

View File

@ -7,12 +7,14 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"sync"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/binaryutil"
"github.com/influxdata/influxdb/pkg/limiter"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -35,6 +37,8 @@ type SeriesFile struct {
path string
partitions []*SeriesPartition
maxSnapshotConcurrency int
refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
Logger *zap.Logger
@ -42,10 +46,27 @@ type SeriesFile struct {
// NewSeriesFile returns a new instance of SeriesFile.
func NewSeriesFile(path string) *SeriesFile {
return &SeriesFile{
path: path,
Logger: zap.NewNop(),
maxSnapshotConcurrency := runtime.GOMAXPROCS(0)
if maxSnapshotConcurrency < 1 {
maxSnapshotConcurrency = 1
}
return &SeriesFile{
path: path,
maxSnapshotConcurrency: maxSnapshotConcurrency,
Logger: zap.NewNop(),
}
}
func (f *SeriesFile) WithMaxCompactionConcurrency(maxCompactionConcurrency int) {
if maxCompactionConcurrency < 1 {
maxCompactionConcurrency = runtime.GOMAXPROCS(0)
if maxCompactionConcurrency < 1 {
maxCompactionConcurrency = 1
}
}
f.maxSnapshotConcurrency = maxCompactionConcurrency
}
// Open memory maps the data file at the file's path.
@ -59,10 +80,13 @@ func (f *SeriesFile) Open() error {
return err
}
// Limit concurrent series file compactions
compactionLimiter := limiter.NewFixed(f.maxSnapshotConcurrency)
// Open partitions.
f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
for i := 0; i < SeriesFilePartitionN; i++ {
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
p := NewSeriesPartition(i, f.SeriesPartitionPath(i), compactionLimiter)
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
if err := p.Open(); err != nil {
f.Logger.Error("Unable to open series file",

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
)
@ -40,6 +41,7 @@ type SeriesPartition struct {
seq uint64 // series id sequence
compacting bool
compactionLimiter limiter.Fixed
compactionsDisabled int
CompactThreshold int
@ -48,14 +50,15 @@ type SeriesPartition struct {
}
// NewSeriesPartition returns a new instance of SeriesPartition.
func NewSeriesPartition(id int, path string) *SeriesPartition {
func NewSeriesPartition(id int, path string, compactionLimiter limiter.Fixed) *SeriesPartition {
return &SeriesPartition{
id: id,
path: path,
closing: make(chan struct{}),
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
Logger: zap.NewNop(),
seq: uint64(id) + 1,
id: id,
path: path,
closing: make(chan struct{}),
compactionLimiter: compactionLimiter,
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
Logger: zap.NewNop(),
seq: uint64(id) + 1,
}
}
@ -255,13 +258,16 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
}
// Check if we've crossed the compaction threshold.
if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
if p.compactionsEnabled() && !p.compacting &&
p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) &&
p.compactionLimiter.TryTake() {
p.compacting = true
log, logEnd := logger.NewOperation(p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path))
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.compactionLimiter.Release()
compactor := NewSeriesPartitionCompactor()
compactor.cancel = p.closing

View File

@ -511,6 +511,7 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
}
sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileDirectory))
sfile.WithMaxCompactionConcurrency(s.EngineOptions.Config.SeriesFileMaxConcurrentSnapshotCompactions)
sfile.Logger = s.baseLogger
if err := sfile.Open(); err != nil {
return nil, err