From 2d85ff1d09c13a666ed3bd04b2197ea98655d845 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 13 Dec 2017 08:20:35 -0700 Subject: [PATCH] Adjust compaction planning Increase level 1 min criteria, fix only fast compactions getting run, and fix very large generations getting included in optimize plans. --- tsdb/engine/tsm1/compact.go | 109 +++++++++++++++--------- tsdb/engine/tsm1/compact_test.go | 142 +++++++++++++++---------------- tsdb/engine/tsm1/engine.go | 16 ++-- 3 files changed, 148 insertions(+), 119 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index bb3ee07c44..2e58058c53 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -254,6 +254,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } minGenerations := 4 + if level == 1 { + minGenerations = 8 + } + var cGroups []CompactionGroup for _, group := range levelGroups { for _, chunk := range group.chunk(minGenerations) { @@ -314,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { for i := 0; i < len(generations); i++ { cur := generations[i] + // Skip the file if it's over the max size and contains a full block and it does not have any tombstones + if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { + continue + } + // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -542,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { compactable := []tsmGenerations{} for _, group := range groups { //if we don't have enough generations to compact, skip it - if len(group) < 2 && !group.hasTombstones() { + if len(group) < 4 && !group.hasTombstones() { continue } compactable = append(compactable, group) @@ -673,8 +682,7 @@ type Compactor struct { // lastSnapshotDuration is the amount of time the last snapshot took to complete. lastSnapshotDuration time.Duration - // snapshotConcurrency is the amount of parallelism used to snapshot the cache. - snapshotConcurrency int + snapshotLatencies *latencies // The channel to signal that any in progress snapshots should be aborted. snapshotsInterrupt chan struct{} @@ -696,7 +704,7 @@ func (c *Compactor) Open() { c.compactionsEnabled = true c.snapshotsInterrupt = make(chan struct{}) c.compactionsInterrupt = make(chan struct{}) - c.snapshotConcurrency = 1 + c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)} c.files = make(map[string]struct{}) } @@ -765,7 +773,6 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled intC := c.snapshotsInterrupt - concurrency := c.snapshotConcurrency c.mu.RUnlock() if !enabled { @@ -773,6 +780,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } start := time.Now() + card := cache.Count() + + // Enable throttling if we have lower cardinality or snapshots are going fast. + throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second + + // Write snapshost concurrently if cardinality is relatively high. + concurrency := card / 2e6 + if concurrency < 1 { + concurrency = 1 + } + + // Special case very high cardinality, use max concurrency and don't throttle writes. + if card >= 3e6 { + concurrency = 4 + throttle = false + } splits := cache.Split(concurrency) @@ -785,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { for i := 0; i < concurrency; i++ { go func(sp *Cache) { iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle) resC <- res{files: files, err: err} }(splits[i]) @@ -802,35 +825,13 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } dur := time.Since(start).Truncate(time.Second) - maxConcurrency := runtime.GOMAXPROCS(0) / 2 - if maxConcurrency < 1 { - maxConcurrency = 1 - } - if maxConcurrency > 4 { - maxConcurrency = 4 - } c.mu.Lock() // See if we were disabled while writing a snapshot enabled = c.snapshotsEnabled - - // See if we need to adjust our snapshot concurrency - if dur > 30*time.Second && dur > c.lastSnapshotDuration { - // Increase snapshot concurrency if they are running slow - c.snapshotConcurrency++ - if c.snapshotConcurrency > maxConcurrency { - c.snapshotConcurrency = maxConcurrency - } - } else if dur < 30*time.Second && dur < c.lastSnapshotDuration { - // Decrease snapshot concurrency if they are running too fast - c.snapshotConcurrency-- - if c.snapshotConcurrency < 1 { - c.snapshotConcurrency = 1 - } - } - c.lastSnapshotDuration = dur + c.snapshotLatencies.add(time.Since(start)) c.mu.Unlock() if !enabled { @@ -899,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, err } - return c.writeNewFiles(maxGeneration, maxSequence, tsm) + return c.writeNewFiles(maxGeneration, maxSequence, tsm, true) } // CompactFull writes multiple smaller TSM files into 1 or more larger files. @@ -980,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { // writeNewFiles writes from the iterator into new TSM files, rotating // to a new file once it has reached the max TSM file size. -func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) { +func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) { // These are the new TSM files written var files []string @@ -990,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension)) // Write as much as possible to this file - err := c.write(fileName, iter) + err := c.write(fileName, iter, throttle) // We've hit the max file limit and there is more to write. Create a new file // and continue. @@ -1029,17 +1030,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ return files, nil } -func (c *Compactor) write(path string, iter KeyIterator) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } // Create the write for the new TSM file. - var w TSMWriter + var ( + w TSMWriter + limitWriter io.Writer = fd + ) - var limitWriter io.Writer = fd - if c.RateLimit != nil { + if c.RateLimit != nil && throttle { limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit) } @@ -1549,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte } func (c *cacheKeyIterator) EstimatedIndexSize() int { - // We return 0 here since we already have all the entries in memory to write an index. - return 0 + var n int + for _, v := range c.order { + n += len(v) + } + return n } func (c *cacheKeyIterator) encode() { @@ -1724,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool { } return true } + +type latencies struct { + i int + values []time.Duration +} + +func (l *latencies) add(t time.Duration) { + l.values[l.i%len(l.values)] = t + l.i++ +} + +func (l *latencies) avg() time.Duration { + var n int64 + var sum time.Duration + for _, v := range l.values { + if v == 0 { + continue + } + sum += v + n++ + } + + if n > 0 { + return time.Duration(int64(sum) / n) + } + return time.Duration(0) +} diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 59221315d9..617b7567a6 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1469,6 +1469,22 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "09-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "10-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1479,7 +1495,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) @@ -1545,55 +1561,6 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { } } -func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) { - data := []tsm1.FileStat{ - tsm1.FileStat{ - Path: "01-03.tsm1", - Size: 251 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "02-03.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "03-01.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "04-01.tsm1", - Size: 10 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "05-02.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "06-01.tsm1", - Size: 1 * 1024 * 1024, - }, - } - - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) - - expFiles := []tsm1.FileStat{data[2], data[3]} - tsm := cp.PlanLevel(1) - if exp, got := len(expFiles), len(tsm[0]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles { - if got, exp := tsm[0][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } -} - func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ @@ -1810,8 +1777,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1823,16 +1789,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) } } - - if exp, got := len(expFiles2), len(tsm[1]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles2 { - if got, exp := tsm[1][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } } func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { @@ -1877,6 +1833,30 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { Path: "10-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "13-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "14-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "15-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "16-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1887,8 +1867,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := data[0:8] + expFiles2 := data[8:16] tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -2567,25 +2547,41 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { Size: 2148728539, }, tsm1.FileStat{ - Path: "000000005-000000002.tsm", - Size: 701863692, + Path: "000000005-000000001.tsm", + Size: 2148340232, }, tsm1.FileStat{ - Path: "000000006-000000002.tsm", - Size: 701863692, + Path: "000000006-000000001.tsm", + Size: 2148356556, }, tsm1.FileStat{ - Path: "000000007-000000002.tsm", - Size: 701863692, + Path: "000000007-000000001.tsm", + Size: 167780181, }, tsm1.FileStat{ - Path: "000000008-000000002.tsm", - Size: 701863692, + Path: "000000008-000000001.tsm", + Size: 2148728539, }, tsm1.FileStat{ Path: "000000009-000000002.tsm", Size: 701863692, }, + tsm1.FileStat{ + Path: "000000010-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000011-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000012-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000013-000000002.tsm", + Size: 701863692, + }, } }, }, tsdb.DefaultCompactFullWriteColdDuration, @@ -2623,7 +2619,7 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } - if got, exp := len(tsm[0]), 9; got != exp { + if got, exp := len(tsm[0]), 13; got != exp { t.Fatalf("plan length mismatch: got %v, exp %v", got, exp) } cp.Release(tsm) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index fbc0de5fbf..9bc17273e2 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1695,7 +1695,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(10 * time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1757,15 +1757,15 @@ func (e *Engine) compact(quit <-chan struct{}) { switch level { case 1: - if e.compactHiPriorityLevel(level1Groups[0], 1) { + if e.compactHiPriorityLevel(level1Groups[0], 1, false) { level1Groups = level1Groups[1:] } case 2: - if e.compactHiPriorityLevel(level2Groups[0], 2) { + if e.compactHiPriorityLevel(level2Groups[0], 2, false) { level2Groups = level2Groups[1:] } case 3: - if e.compactLoPriorityLevel(level3Groups[0], 3) { + if e.compactLoPriorityLevel(level3Groups[0], 3, true) { level3Groups = level3Groups[1:] } case 4: @@ -1786,8 +1786,8 @@ func (e *Engine) compact(quit <-chan struct{}) { // compactHiPriorityLevel kicks off compactions using the high priority policy. It returns // true if the compaction was started -func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false } @@ -1815,8 +1815,8 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { // compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns // the plans that were not able to be started -func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false }