Add tsm1 open limiter

This commit restricts the number of TSM1 files that can be opened
concurrently across the entire `tsdb.Store`. There is currently
a limit for the number of shards that can be opened concurrently,
however, this limit does not help when the number of CPU cores
is higher than the number of shards. Because TSM1 files have a 2GB
limit and there is no limit on the number of files per shard,
extremely large shards (1TB+) can load 1,000s of files simultaneously.
pull/9918/head
Ben Johnson 2018-05-29 10:15:48 -06:00
parent 2b3cd8406f
commit d3e3b05a49
No known key found for this signature in database
GPG Key ID: 81741CD251883081
4 changed files with 16 additions and 0 deletions

View File

@ -162,6 +162,9 @@ type EngineOptions struct {
ShardID uint64
InmemIndex interface{} // shared in-memory index
// Limits the concurrent number of TSM files that can be loaded at once.
OpenLimiter limiter.Fixed
CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate

View File

@ -204,6 +204,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
}
fs := NewFileStore(path)
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}

View File

@ -172,6 +172,8 @@ type FileStore struct {
files []TSMFile
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
@ -217,6 +219,7 @@ func NewFileStore(dir string) *FileStore {
lastModified: time.Time{},
logger: logger,
traceLogger: logger,
openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
stats: &FileStoreStatistics{},
purger: &purger{
files: map[string]TSMFile{},
@ -500,6 +503,12 @@ func (f *FileStore) Open() error {
}
go func(idx int, file *os.File) {
// Ensure a limited number of TSM files are loaded at once.
// Systems which have very large datasets (1TB+) can have thousands
// of TSM files which can cause extremely long load times.
f.openLimiter.Take()
defer f.openLimiter.Release()
start := time.Now()
df, err := NewTSMReader(file)
f.logger.Info("Opened file",

View File

@ -206,6 +206,9 @@ func (s *Store) loadShards() error {
err error
}
// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
// Setup a shared limiter for compactions
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {