From 80fef4af4a03b4c832442d6676862a87fbee8b96 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 1 May 2017 09:14:04 -0600 Subject: [PATCH 01/10] Enable shards after loading Compactions are enabled as soon as the shard is opened. This can slow down startup or cause the system to spike in CPU usage at startup if many shards need to be compacted. This now delays compactions until after they are loaded. --- tsdb/store.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tsdb/store.go b/tsdb/store.go index 443001df96..b0e3905300 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -224,6 +224,9 @@ func (s *Store) loadShards() error { // Open engine. shard := NewShard(shardID, path, walPath, opt) + + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false shard.WithLogger(s.baseLogger) err = shard.Open() @@ -251,6 +254,12 @@ func (s *Store) loadShards() error { s.databases[res.s.database] = struct{}{} } close(resC) + + // Enable all shards + for _, sh := range s.shards { + sh.SetEnabled(true) + } + return nil } From 8fc9853ed80f3ea9b37293565318eef57ffb7ba8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 1 May 2017 11:11:29 -0600 Subject: [PATCH 02/10] Add max-concurrent-compactions limit This limit allows the number of concurrent level and full compactions to be throttled. Snapshot compactions are not affected by this limit as then need to run continously. This limit can be used to control how much CPU is consumed by compactions. The default is to limit to the number of CPU available. --- etc/config.sample.toml | 5 +++++ tsdb/config.go | 20 ++++++++++++++++++-- tsdb/engine.go | 10 ++++++---- tsdb/engine/tsm1/engine.go | 16 +++++++++++++++- tsdb/store.go | 7 +++++++ 5 files changed, 51 insertions(+), 7 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e380aaeacd..25d14c239a 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -89,6 +89,11 @@ # write or delete # compact-full-write-cold-duration = "4h" + # The maximum number of concurrent full and level compactions that can run at one time. A + # value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply + # to cache snapshotting. + # max-concurrent-compactions = 0 + # The maximum series allowed per database before writes are dropped. This limit can prevent # high cardinality issues at the database level. This limit can be disabled by setting it to # 0. diff --git a/tsdb/config.go b/tsdb/config.go index 8971f76843..d1596bb8a3 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -47,6 +47,10 @@ const ( // DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement. DefaultMaxValuesPerTag = 100000 + + // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions + // that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime. + DefaultMaxConcurrentCompactions = 0 ) // Config holds the configuration for the tsbd package. @@ -84,6 +88,12 @@ type Config struct { // A value of 0 disables the limit. MaxValuesPerTag int `toml:"max-values-per-tag"` + // MaxConcurrentCompactions is the maximum number of concurrent level and full compactions + // that can be running at one time across all shards. Compactions scheduled to run when the + // limit is reached are blocked until a running compaction completes. Snapshot compactions are + // not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0). + MaxConcurrentCompactions int `toml:"max-concurrent-compactions"` + TraceLoggingEnabled bool `toml:"trace-logging-enabled"` } @@ -100,8 +110,9 @@ func NewConfig() Config { CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration), CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration), - MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, - MaxValuesPerTag: DefaultMaxValuesPerTag, + MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, + MaxValuesPerTag: DefaultMaxValuesPerTag, + MaxConcurrentCompactions: DefaultMaxConcurrentCompactions, TraceLoggingEnabled: false, } @@ -115,6 +126,10 @@ func (c *Config) Validate() error { return errors.New("Data.WALDir must be specified") } + if c.MaxConcurrentCompactions < 0 { + return errors.New("max-concurrent-compactions must be greater than 0") + } + valid := false for _, e := range RegisteredEngines() { if e == c.Engine { @@ -152,5 +167,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { "compact-full-write-cold-duration": c.CompactFullWriteColdDuration, "max-series-per-database": c.MaxSeriesPerDatabase, "max-values-per-tag": c.MaxValuesPerTag, + "max-concurrent-compactions": c.MaxConcurrentCompactions, }), nil } diff --git a/tsdb/engine.go b/tsdb/engine.go index a17410c642..d885dbc0db 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/uber-go/zap" ) @@ -136,10 +137,11 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { - EngineVersion string - IndexVersion string - ShardID uint64 - InmemIndex interface{} // shared in-memory index + EngineVersion string + IndexVersion string + ShardID uint64 + InmemIndex interface{} // shared in-memory index + CompactionLimiter limiter.Fixed Config Config } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 125a27e856..99ccb7a5ad 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb" _ "github.com/influxdata/influxdb/tsdb/index" "github.com/uber-go/zap" @@ -132,6 +133,9 @@ type Engine struct { enableCompactionsOnOpen bool stats *EngineStatistics + + // The limiter for concurrent compactions + compactionLimiter limiter.Fixed } // NewEngine returns a new instance of Engine. @@ -171,7 +175,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb. CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, - stats: &EngineStatistics{}, + stats: &EngineStatistics{}, + compactionLimiter: opt.CompactionLimiter, } // Attach fieldset to index. @@ -1205,6 +1210,7 @@ type compactionStrategy struct { logger zap.Logger compactor *Compactor fileStore *FileStore + limiter limiter.Fixed } // Apply concurrently compacts all the groups in a compaction strategy. @@ -1226,6 +1232,12 @@ func (s *compactionStrategy) Apply() { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(groupNum int) { + // Limit concurrent compactions if we have a limiter + if cap(s.limiter) > 0 { + s.limiter.Take() + defer s.limiter.Release() + } + group := s.compactionGroups[groupNum] start := time.Now() s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group))) @@ -1290,6 +1302,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate fileStore: e.FileStore, compactor: e.Compactor, fast: fast, + limiter: e.compactionLimiter, description: fmt.Sprintf("level %d", level), activeStat: &e.stats.TSMCompactionsActive[level-1], @@ -1320,6 +1333,7 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy { fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, + limiter: e.compactionLimiter, } if optimize { diff --git a/tsdb/store.go b/tsdb/store.go index b0e3905300..4a01776694 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -158,6 +158,13 @@ func (s *Store) loadShards() error { t := limiter.NewFixed(runtime.GOMAXPROCS(0)) + // Setup a shared limiter for compactions + lim := s.EngineOptions.Config.MaxConcurrentCompactions + if lim == 0 { + lim = runtime.GOMAXPROCS(0) + } + s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + resC := make(chan *res) var n int From 3d1c0cd9812e2f6b86c50ed131889e7828d95991 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 1 May 2017 12:35:22 -0600 Subject: [PATCH 03/10] Don't return compaction plans for files already part of a plan The compactor prevents the same file from being compacted by different compaction runs, but it can result in warning errors in the logs that are confusing. This adds compaction plan tracking to the planner so that files are only part of one plan at a given time. --- tsdb/engine/tsm1/compact.go | 82 ++++++++++-- tsdb/engine/tsm1/compact_test.go | 208 ++++++++++++++++--------------- tsdb/engine/tsm1/engine.go | 15 ++- tsdb/engine/tsm1/engine_test.go | 1 + 4 files changed, 188 insertions(+), 118 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 562624a5a3..4ca1273633 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -53,6 +53,7 @@ type CompactionPlanner interface { Plan(lastWrite time.Time) []CompactionGroup PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup + Release(group []CompactionGroup) } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -60,17 +61,13 @@ type CompactionPlanner interface { // to minimize the number of TSM files on disk while rolling up a bounder number // of files. type DefaultPlanner struct { - FileStore interface { - Stats() []FileStat - LastModified() time.Time - BlockCount(path string, idx int) int - } + FileStore fileStore - // CompactFullWriteColdDuration specifies the length of time after + // compactFullWriteColdDuration specifies the length of time after // which if no writes have been committed to the WAL, the engine will // do a full compaction of the TSM files in this shard. This duration // should always be greater than the CacheFlushWriteColdDuraion - CompactFullWriteColdDuration time.Duration + compactFullWriteColdDuration time.Duration // lastPlanCheck is the last time Plan was called lastPlanCheck time.Time @@ -81,6 +78,24 @@ type DefaultPlanner struct { // lastGenerations is the last set of generations found by findGenerations lastGenerations tsmGenerations + + // filesInUse is the set of files that have been returned as part of a plan and might + // be being compacted. Two plans should not return the same file at any given time. + filesInUse map[string]struct{} +} + +type fileStore interface { + Stats() []FileStat + LastModified() time.Time + BlockCount(path string, idx int) int +} + +func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner { + return &DefaultPlanner{ + FileStore: fs, + compactFullWriteColdDuration: writeColdDuration, + filesInUse: make(map[string]struct{}), + } } // tsmGeneration represents the TSM files within a generation. @@ -205,6 +220,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } } + if !c.acquire(cGroups) { + return nil + } + return cGroups } @@ -270,6 +289,10 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { cGroups = append(cGroups, cGroup) } + if !c.acquire(cGroups) { + return nil + } + return cGroups } @@ -279,7 +302,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { generations := c.findGenerations() // first check if we should be doing a full compaction because nothing has been written in a long time - if c.CompactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.CompactFullWriteColdDuration && len(generations) > 1 { + if c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 { var tsmFiles []string var genCount int for i, group := range generations { @@ -316,7 +339,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { return nil } - return []CompactionGroup{tsmFiles} + group := []CompactionGroup{tsmFiles} + if !c.acquire(group) { + return nil + } + return group } // don't plan if nothing has changed in the filestore @@ -449,6 +476,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { tsmFiles = append(tsmFiles, cGroup) } + if !c.acquire(tsmFiles) { + return nil + } return tsmFiles } @@ -496,6 +526,40 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { return orderedGenerations } +func (c *DefaultPlanner) acquire(groups []CompactionGroup) bool { + c.mu.Lock() + defer c.mu.Unlock() + + // See if the new files are already in use + for _, g := range groups { + for _, f := range g { + if _, ok := c.filesInUse[f]; ok { + return false + } + } + } + + // Mark all the new files in use + for _, g := range groups { + for _, f := range g { + c.filesInUse[f] = struct{}{} + } + } + return true +} + +// Release removes the files reference in each compaction group allowing new plans +// to be able to use them. +func (c *DefaultPlanner) Release(groups []CompactionGroup) { + c.mu.Lock() + defer c.mu.Unlock() + for _, g := range groups { + for _, f := range g { + delete(c.filesInUse, f) + } + } +} + // Compactor merges multiple TSM files into new files or // writes a Cache into 1 or more TSM files. type Compactor struct { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 893b61519e..502e42efa5 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) @@ -1090,8 +1091,8 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } func TestDefaultPlanner_Plan_Min(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -1108,8 +1109,8 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -1151,13 +1152,13 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) @@ -1213,13 +1214,11 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, + cp := tsm1.NewDefaultPlanner(&fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data }, - } + }, tsdb.DefaultCompactFullWriteColdDuration) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} @@ -1280,13 +1279,13 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[4], data[5]} tsm := cp.PlanLevel(1) @@ -1333,13 +1332,13 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]} tsm := cp.PlanLevel(3) @@ -1382,13 +1381,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[2], data[3]} tsm := cp.PlanLevel(1) @@ -1435,13 +1434,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(3) @@ -1478,13 +1477,13 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(3) @@ -1510,13 +1509,13 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(2) @@ -1554,13 +1553,13 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1]} tsm := cp.PlanLevel(3) @@ -1603,13 +1602,13 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[4], data[5]} @@ -1652,13 +1651,13 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanOptimize() @@ -1695,13 +1694,13 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.PlanOptimize() @@ -1760,13 +1759,13 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[5], data[6], data[7], data[8]} @@ -1813,13 +1812,13 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanOptimize() @@ -1845,13 +1844,13 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2]} tsm := cp.PlanOptimize() @@ -1897,14 +1896,14 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, }, - CompactFullWriteColdDuration: time.Nanosecond, - } + time.Nanosecond, + ) tsm := cp.Plan(time.Now().Add(-time.Second)) if exp, got := len(data), len(tsm[0]); got != exp { @@ -1932,13 +1931,13 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -1975,15 +1974,13 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { blockCount: 1000, } - cp := &tsm1.DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Nanosecond, - } - + cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) + plan := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files - if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { + if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) // skip planning if all files are over the limit over := []tsm1.FileStat{ @@ -2017,14 +2014,18 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.FileStore = overFs - if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp { + plan = cp.Plan(time.Now().Add(-time.Second)) + if exp, got := 0, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) + plan = cp.PlanOptimize() // ensure the optimize planner would pick this up - if exp, got := 1, len(cp.PlanOptimize()); got != exp { + if exp, got := 1, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) cp.FileStore = fs // ensure that it will plan if last modified has changed @@ -2082,15 +2083,14 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ blockCount: 1000, PathsFn: func() []tsm1.FileStat { return data }, }, - CompactFullWriteColdDuration: time.Hour, - } + time.Hour) tsm := cp.Plan(time.Now().Add(-24 * time.Hour)) if exp, got := 1, len(tsm); got != exp { @@ -2127,15 +2127,17 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { blockCount: 100, } - cp := &tsm1.DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Nanosecond, - } + cp := tsm1.NewDefaultPlanner( + fs, + time.Nanosecond, + ) + plan := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files - if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { + if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) // skip planning if all files are over the limit over := []tsm1.FileStat{ @@ -2188,13 +2190,13 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) @@ -2210,8 +2212,8 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { } func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -2236,8 +2238,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -2246,8 +2248,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { } func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -2272,8 +2274,8 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 99ccb7a5ad..c80a69ec02 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -165,12 +165,9 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb. WAL: w, Cache: cache, - FileStore: fs, - Compactor: c, - CompactionPlan: &DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Duration(opt.Config.CompactFullWriteColdDuration), - }, + FileStore: fs, + Compactor: c, + CompactionPlan: NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration)), CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), @@ -1170,8 +1167,12 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { case <-t.C: s := e.levelCompactionStrategy(fast, level) if s != nil { + // Release the files in the compaction plan + defer e.CompactionPlan.Release(s.compactionGroups) + s.Apply() } + } } } @@ -1188,6 +1189,8 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { case <-t.C: s := e.fullCompactionStrategy() if s != nil { + // Release the files in the compaction plan + defer e.CompactionPlan.Release(s.compactionGroups) s.Apply() } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 63b385f194..b2f941ba30 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1059,6 +1059,7 @@ type mockPlanner struct{} func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } +func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} // ParseTags returns an instance of Tags for a comma-delimited list of key/values. func ParseTags(s string) influxql.Tags { From f87fd7c7ed397209c855fcd977f407db85694613 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 May 2017 09:20:01 -0600 Subject: [PATCH 04/10] Stop background compaction goroutines when shard is cold Each shard has a number of goroutines for compacting different levels of TSM files. When a shard goes cold and is fully compacted, these goroutines are still running. This change will stop background shard goroutines when the shard goes cold and start them back up if new writes arrive. --- tsdb/engine.go | 3 +++ tsdb/engine/tsm1/compact.go | 7 +++++++ tsdb/engine/tsm1/engine.go | 15 +++++++++++++++ tsdb/engine/tsm1/engine_test.go | 1 + tsdb/engine/tsm1/file_store.go | 31 ++++++++++++++++++------------- tsdb/engine/tsm1/reader.go | 5 +++++ tsdb/shard.go | 23 ++++++++++++++++++++++- tsdb/store.go | 27 +++++++++++++++++++++++++++ 8 files changed, 98 insertions(+), 14 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index d885dbc0db..71bff1e603 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -31,6 +31,8 @@ type Engine interface { Open() error Close() error SetEnabled(enabled bool) + SetCompactionsEnabled(enabled bool) + WithLogger(zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error @@ -72,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + IsIdle() bool io.WriterTo } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 4ca1273633..7aba38a7af 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -54,6 +54,7 @@ type CompactionPlanner interface { PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup Release(group []CompactionGroup) + FullyCompacted() bool } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -144,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool { return false } +// FullyCompacted returns true if the shard is fully compacted. +func (c *DefaultPlanner) FullyCompacted() bool { + gens := c.findGenerations() + return len(gens) <= 1 && !gens.hasTombstones() +} + // PlanLevel returns a set of TSM files to rewrite for a specific level. func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Determine the generations from all files on disk. We need to treat diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c80a69ec02..3a5a98a93e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -528,6 +528,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { return nil } +// IsIdle returns true if the cache is empty, there are no running compactions and the +// shard is fully compacted. +func (e *Engine) IsIdle() bool { + cacheEmpty := e.Cache.Size() == 0 + + runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive) + + return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted() +} + // Backup writes a tar archive of any TSM files modified since the passed // in time to the passed in writer. The basePath will be prepended to the names // of the files in the archive. It will force a snapshot of the WAL first diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index b2f941ba30..8254a37ac3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1060,6 +1060,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} +func (m *mockPlanner) FullyCompacted() bool { return false } // ParseTags returns an instance of Tags for a comma-delimited list of key/values. func ParseTags(s string) influxql.Tags { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index decb843083..34a3569d86 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error { // DeleteRange removes the values for keys between timestamps min and max. func (f *FileStore) DeleteRange(keys []string, min, max int64) error { + if err := f.walkFiles(func(tsm TSMFile) error { + return tsm.DeleteRange(keys, min, max) + }); err != nil { + return err + } + f.mu.Lock() f.lastModified = time.Now().UTC() + f.lastFileStats = nil f.mu.Unlock() - - return f.walkFiles(func(tsm TSMFile) error { - return tsm.DeleteRange(keys, min, max) - }) + return nil } // Open loads all the TSM files in the configured directory. @@ -382,15 +386,6 @@ func (f *FileStore) Open() error { return fmt.Errorf("error opening file %s: %v", fn, err) } - // Accumulate file store size stat - fi, err := file.Stat() - if err == nil { - atomic.AddInt64(&f.stats.DiskBytes, fi.Size()) - if fi.ModTime().UTC().After(f.lastModified) { - f.lastModified = fi.ModTime().UTC() - } - } - go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReader(file) @@ -404,6 +399,7 @@ func (f *FileStore) Open() error { }(i, file) } + var lm int64 for range files { res := <-readerC if res.err != nil { @@ -411,7 +407,16 @@ func (f *FileStore) Open() error { return res.err } f.files = append(f.files, res.r) + // Accumulate file store size stats + atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + + // Re-initialize the lastModified time for the file store + if res.r.LastModified() > lm { + lm = res.r.LastModified() + } + } + f.lastModified = time.Unix(0, lm) close(readerC) sort.Sort(tsmReaders(f.files)) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 0c9d277914..7e5abf3b99 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 { func (t *TSMReader) LastModified() int64 { t.mu.RLock() lm := t.lastModified + for _, ts := range t.tombstoner.TombstoneFiles() { + if ts.LastModified > lm { + lm = ts.LastModified + } + } t.mu.RUnlock() return lm } diff --git a/tsdb/shard.go b/tsdb/shard.go index 680245f997..c03030093e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -110,6 +110,7 @@ type Shard struct { path string walPath string id uint64 + wg sync.WaitGroup database string retentionPolicy string @@ -288,6 +289,7 @@ func (s *Shard) Open() error { } s.engine = e + s.wg.Add(1) go s.monitor() return nil @@ -335,6 +337,7 @@ func (s *Shard) close(clean bool) error { default: close(s.closing) } + s.wg.Wait() if clean { // Don't leak our shard ID and series keys in the index @@ -380,6 +383,23 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } +// IsIdle return true if the shard is not receiving writes and is fully compacted. +func (s *Shard) IsIdle() bool { + if err := s.ready(); err != nil { + return true + } + + return s.engine.IsIdle() +} + +// SetCompactionsEnabled enables or disable shard background compactions. +func (s *Shard) SetCompactionsEnabled(enabled bool) { + if err := s.ready(); err != nil { + return + } + s.engine.SetCompactionsEnabled(enabled) +} + // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { var size int64 @@ -965,6 +985,8 @@ func (s *Shard) CreateSnapshot() (string, error) { } func (s *Shard) monitor() { + defer s.wg.Done() + t := time.NewTicker(monitorStatInterval) defer t.Stop() t2 := time.NewTicker(time.Minute) @@ -976,7 +998,6 @@ func (s *Shard) monitor() { case <-s.closing: return case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only // check if something has changed. lm := s.LastModified() diff --git a/tsdb/store.go b/tsdb/store.go index 4a01776694..3abe51f3f9 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -145,6 +145,8 @@ func (s *Store) Open() error { } s.opened = true + s.wg.Add(1) + go s.monitorShards() return nil } @@ -265,6 +267,9 @@ func (s *Store) loadShards() error { // Enable all shards for _, sh := range s.shards { sh.SetEnabled(true) + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } } return nil @@ -1046,6 +1051,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return tagValues, nil } +func (s *Store) monitorShards() { + defer s.wg.Done() + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-s.closing: + return + case <-t.C: + s.mu.RLock() + for _, sh := range s.shards { + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } else { + sh.SetCompactionsEnabled(true) + } + } + s.mu.RUnlock() + } + } +} + // KeyValue holds a string key and a string value. type KeyValue struct { Key, Value string From 684f5d884a613b45aa442084e2b2d4c730084fcd Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 May 2017 10:56:17 -0600 Subject: [PATCH 05/10] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b628c8e3c8..df45076e88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements - [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI. - [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1 +- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits ### Bugfixes @@ -54,6 +55,8 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries. - [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries. - [#8045](https://github.com/influxdata/influxdb/issues/8045): Refactor the subquery code and fix outer condition queries. +- [#7425](https://github.com/influxdata/influxdb/issues/7425): Fix compaction aborted log messages +- [#8123](https://github.com/influxdata/influxdb/issues/8123): TSM compaction does not remove .tmp on error ## v1.2.3 [unreleased] From 88848a942653293acaa73366457c2c01426dc526 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 May 2017 22:42:09 -0600 Subject: [PATCH 06/10] Remove per shard monitor goroutine The monitor goroutine ran for each shard and updated disk stats as well as logged cardinality warnings. This goroutine has been removed by making the disks stats more lightweight and callable direclty from Statisics and move the logging to the tsdb.Store. The latter allows one goroutine to handle all shards. --- tsdb/engine.go | 1 + tsdb/engine/tsm1/engine.go | 5 ++ tsdb/engine/tsm1/file_store.go | 11 ++++ tsdb/engine/tsm1/wal.go | 4 ++ tsdb/shard.go | 104 ++++++--------------------------- tsdb/store.go | 41 +++++++++++++ 6 files changed, 79 insertions(+), 87 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index 71bff1e603..a81144d07a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -74,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + DiskSize() int64 IsIdle() bool io.WriterTo diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 3a5a98a93e..cd665e5bbe 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -430,6 +430,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { return statistics } +// DiskSize returns the total size in bytes of all TSM and WAL segments on disk. +func (e *Engine) DiskSize() int64 { + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() +} + // Open opens and initializes the engine. func (e *Engine) Open() error { if err := os.MkdirAll(e.path, 0777); err != nil { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 34a3569d86..e390535770 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -409,6 +409,9 @@ func (f *FileStore) Open() error { f.files = append(f.files, res.r) // Accumulate file store size stats atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + for _, ts := range res.r.TombstoneFiles() { + atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size)) + } // Re-initialize the lastModified time for the file store if res.r.LastModified() > lm { @@ -439,6 +442,10 @@ func (f *FileStore) Close() error { return nil } +func (f *FileStore) DiskSizeBytes() int64 { + return atomic.LoadInt64(&f.stats.DiskBytes) +} + // Read returns the slice of values for the given key and the given timestamp, // if any file matches those constraints. func (f *FileStore) Read(key string, t int64) ([]Value, error) { @@ -628,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { var totalSize int64 for _, file := range f.files { totalSize += int64(file.Size()) + for _, ts := range file.TombstoneFiles() { + totalSize += int64(ts.Size) + } + } atomic.StoreInt64(&f.stats.DiskBytes, totalSize) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index faff78c425..28082629b5 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time { return l.lastWriteTime } +func (l *WAL) DiskSizeBytes() int64 { + return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes) +} + func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below diff --git a/tsdb/shard.go b/tsdb/shard.go index c03030093e..294b848939 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "os" "path/filepath" "regexp" "sort" @@ -207,6 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } + // Refresh our disk size stat + _, _ = s.DiskSize() + // TODO(edd): Should statSeriesCreate be the current number of series in the // shard, or the total number of series ever created? sSketch, tSketch, err := s.engine.SeriesSketches() @@ -289,9 +291,6 @@ func (s *Shard) Open() error { } s.engine = e - s.wg.Add(1) - go s.monitor() - return nil }(); err != nil { s.close(true) @@ -355,6 +354,12 @@ func (s *Shard) close(clean bool) error { return err } +func (s *Shard) IndexType() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.index.Type() +} + // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled func (s *Shard) ready() error { @@ -402,33 +407,9 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) { // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { - var size int64 - err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - if err != nil { - return 0, err - } - - err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - - return size, err + size := s.engine.DiskSize() + atomic.StoreInt64(&s.stats.DiskBytes, size) + return size, nil } // FieldCreate holds information for a field to create on a measurement. @@ -984,63 +965,12 @@ func (s *Shard) CreateSnapshot() (string, error) { return s.engine.CreateSnapshot() } -func (s *Shard) monitor() { - defer s.wg.Done() +func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + return s.engine.ForEachMeasurementTagKey(name, fn) +} - t := time.NewTicker(monitorStatInterval) - defer t.Stop() - t2 := time.NewTicker(time.Minute) - defer t2.Stop() - var changed time.Time - - for { - select { - case <-s.closing: - return - case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only - // check if something has changed. - lm := s.LastModified() - if lm.Equal(changed) { - continue - } - - size, err := s.DiskSize() - if err != nil { - s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) - continue - } - atomic.StoreInt64(&s.stats.DiskBytes, size) - changed = lm - case <-t2.C: - if s.options.Config.MaxValuesPerTag == 0 { - continue - } - - names, err := s.MeasurementNamesByExpr(nil) - if err != nil { - s.logger.Warn("cannot retrieve measurement names", zap.Error(err)) - continue - } - - for _, name := range names { - s.engine.ForEachMeasurementTagKey(name, func(k []byte) error { - n := s.engine.TagKeyCardinality(name, k) - perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) - if perc > 100 { - perc = 100 - } - - // Log at 80, 85, 90-100% levels - if perc == 80 || perc == 85 || perc >= 90 { - s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", - perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k)) - } - return nil - }) - } - } - } +func (s *Shard) TagKeyCardinality(name, key []byte) int { + return s.engine.TagKeyCardinality(name, key) } type ShardGroup interface { diff --git a/tsdb/store.go b/tsdb/store.go index 3abe51f3f9..cb4aafe272 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1055,6 +1055,8 @@ func (s *Store) monitorShards() { defer s.wg.Done() t := time.NewTicker(10 * time.Second) defer t.Stop() + t2 := time.NewTicker(time.Minute) + defer t2.Stop() for { select { case <-s.closing: @@ -1069,6 +1071,45 @@ func (s *Store) monitorShards() { } } s.mu.RUnlock() + case <-t2.C: + if s.EngineOptions.Config.MaxValuesPerTag == 0 { + continue + } + + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.IndexType() == "inmem" + }) + s.mu.RUnlock() + + s.walkShards(shards, func(sh *Shard) error { + db := sh.database + id := sh.id + + names, err := sh.MeasurementNamesByExpr(nil) + if err != nil { + s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) + return nil + } + + for _, name := range names { + sh.ForEachMeasurementTagKey(name, func(k []byte) error { + n := sh.TagKeyCardinality(name, k) + perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100) + if perc > 100 { + perc = 100 + } + + // Log at 80, 85, 90-100% levels + if perc == 80 || perc == 85 || perc >= 90 { + s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", + perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k)) + } + return nil + }) + } + return nil + }) } } } From b4ea5239100c378870262f3c3c4960bfc30e7358 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 3 May 2017 13:11:21 -0600 Subject: [PATCH 07/10] Include snapshot size in the total cache size This was causing a shard to appear idle when in fact a snapshot compaction was running. If the time was write, the compactions would be disabled and the snapshot compaction would be aborted. --- tsdb/engine/tsm1/cache.go | 6 +++--- tsdb/engine/tsm1/cache_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index d5c911bf82..8d8ababff6 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -260,7 +260,7 @@ func (c *Cache) Write(key string, values []Value) error { // Enough room in the cache? limit := c.maxSize - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize + n := c.Size() + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) @@ -293,7 +293,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { // Enough room in the cache? limit := c.maxSize // maxSize is safe for reading without a lock. - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize + n := c.Size() + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) return ErrCacheMemorySizeLimitExceeded(n, limit) @@ -416,7 +416,7 @@ func (c *Cache) ClearSnapshot(success bool) { // Size returns the number of point-calcuated bytes the cache currently uses. func (c *Cache) Size() uint64 { - return atomic.LoadUint64(&c.size) + return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize) } // increaseSize increases size by delta. diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index a7637ea49e..7376e0b595 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -448,7 +448,7 @@ func TestCache_Snapshot_Stats(t *testing.T) { } // Store size should have been reset. - if got, exp := c.Size(), uint64(0); got != exp { + if got, exp := c.Size(), uint64(16); got != exp { t.Fatalf("got %v, expected %v", got, exp) } From 7371f1067bef109b61ecc5e741fcadb480596587 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 3 May 2017 22:48:10 -0600 Subject: [PATCH 08/10] Fix deadlock in Index.ForEachMeasurementTagKey Index.ForEachMeasurementTagKey held an RLock while call the fn, if the fn made another call into the index which acquired an RLock and after another goroutine tried to acquire a Lock, it would deadlock. --- tsdb/index/inmem/inmem.go | 12 +++++++----- tsdb/meta.go | 5 +---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 940faa8d4b..ca40cd8ce3 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -263,10 +263,13 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // ForEachMeasurementTagKey iterates over all tag keys for a measurement. func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + // Ensure we do not hold a lock on the index while fn executes in case fn tries + // to acquire a lock on the index again. If another goroutine has Lock, this will + // deadlock. i.mu.RLock() - defer i.mu.RUnlock() - mm := i.measurements[string(name)] + i.mu.RUnlock() + if mm == nil { return nil } @@ -537,9 +540,9 @@ func (i *Index) DropSeries(key []byte) error { // ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression. func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error { i.mu.RLock() - defer i.mu.RUnlock() - mm := i.measurements[string(name)] + i.mu.RUnlock() + if mm == nil { return nil } @@ -731,7 +734,6 @@ type ShardIndex struct { // CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { - keys, names, tagsSlice = idx.assignExistingSeries(idx.id, keys, names, tagsSlice) if len(keys) == 0 { return nil diff --git a/tsdb/meta.go b/tsdb/meta.go index b127c33899..8d8ffae6ad 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -287,12 +287,9 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags return err } - m.mu.RLock() - defer m.mu.RUnlock() - // Iterate over each series. for _, id := range ids { - s := m.seriesByID[id] + s := m.SeriesByID(id) if err := fn(s.Tags()); err != nil { return err } From bc639c598201752f524c23a8ef7a3fd1a21d6c3c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 4 May 2017 09:56:15 -0600 Subject: [PATCH 09/10] Make disableLevelCompactions lighter weight Since this is called more frequently now, the cleanup func was invoked quite a bit which makes several syscalls per shard. This should only be called the first time compactions are disabled. --- tsdb/engine/tsm1/engine.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cd665e5bbe..b4572d7fd6 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -245,6 +245,7 @@ func (e *Engine) disableLevelCompactions(wait bool) { e.levelWorkers += 1 } + var cleanup bool if old == 0 && e.done != nil { // Prevent new compactions from starting e.Compactor.DisableCompactions() @@ -252,12 +253,13 @@ func (e *Engine) disableLevelCompactions(wait bool) { // Stop all background compaction goroutines close(e.done) e.done = nil + cleanup = true } e.mu.Unlock() e.wg.Wait() - if old == 0 { // first to disable should cleanup + if cleanup { // first to disable should cleanup if err := e.cleanup(); err != nil { e.logger.Info(fmt.Sprintf("error cleaning up temp file: %v", err)) } From fc34d30038aced69aad6d6e7689001708498da5c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 4 May 2017 10:12:38 -0600 Subject: [PATCH 10/10] Uses SeriesN instead of copying sketches Avoids some extra allocations. --- tsdb/shard.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tsdb/shard.go b/tsdb/shard.go index 294b848939..5682f2f7f4 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -208,15 +208,7 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { // Refresh our disk size stat _, _ = s.DiskSize() - - // TODO(edd): Should statSeriesCreate be the current number of series in the - // shard, or the total number of series ever created? - sSketch, tSketch, err := s.engine.SeriesSketches() - seriesN := int64(sSketch.Count() - tSketch.Count()) - if err != nil { - s.logger.Error("cannot compute series sketch", zap.Error(err)) - seriesN = 0 - } + seriesN := s.engine.SeriesN() tags = s.defaultTags.Merge(tags) statistics := []models.Statistic{{