Adjust compaction planning

Increase level 1 min criteria, fix only fast compactions getting run,
and fix very large generations getting included in optimize plans.
pull/9225/head
Jason Wilder 2017-12-13 08:20:35 -07:00
parent 749c9d2483
commit 2d85ff1d09
3 changed files with 148 additions and 119 deletions

View File

@ -254,6 +254,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}
minGenerations := 4
if level == 1 {
minGenerations = 8
}
var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(minGenerations) {
@ -314,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
for i := 0; i < len(generations); i++ {
cur := generations[i]
// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}
// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
@ -542,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
compactable := []tsmGenerations{}
for _, group := range groups {
//if we don't have enough generations to compact, skip it
if len(group) < 2 && !group.hasTombstones() {
if len(group) < 4 && !group.hasTombstones() {
continue
}
compactable = append(compactable, group)
@ -673,8 +682,7 @@ type Compactor struct {
// lastSnapshotDuration is the amount of time the last snapshot took to complete.
lastSnapshotDuration time.Duration
// snapshotConcurrency is the amount of parallelism used to snapshot the cache.
snapshotConcurrency int
snapshotLatencies *latencies
// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
@ -696,7 +704,7 @@ func (c *Compactor) Open() {
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})
c.snapshotConcurrency = 1
c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)}
c.files = make(map[string]struct{})
}
@ -765,7 +773,6 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
concurrency := c.snapshotConcurrency
c.mu.RUnlock()
if !enabled {
@ -773,6 +780,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}
start := time.Now()
card := cache.Count()
// Enable throttling if we have lower cardinality or snapshots are going fast.
throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second
// Write snapshost concurrently if cardinality is relatively high.
concurrency := card / 2e6
if concurrency < 1 {
concurrency = 1
}
// Special case very high cardinality, use max concurrency and don't throttle writes.
if card >= 3e6 {
concurrency = 4
throttle = false
}
splits := cache.Split(concurrency)
@ -785,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle)
resC <- res{files: files, err: err}
}(splits[i])
@ -802,35 +825,13 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}
dur := time.Since(start).Truncate(time.Second)
maxConcurrency := runtime.GOMAXPROCS(0) / 2
if maxConcurrency < 1 {
maxConcurrency = 1
}
if maxConcurrency > 4 {
maxConcurrency = 4
}
c.mu.Lock()
// See if we were disabled while writing a snapshot
enabled = c.snapshotsEnabled
// See if we need to adjust our snapshot concurrency
if dur > 30*time.Second && dur > c.lastSnapshotDuration {
// Increase snapshot concurrency if they are running slow
c.snapshotConcurrency++
if c.snapshotConcurrency > maxConcurrency {
c.snapshotConcurrency = maxConcurrency
}
} else if dur < 30*time.Second && dur < c.lastSnapshotDuration {
// Decrease snapshot concurrency if they are running too fast
c.snapshotConcurrency--
if c.snapshotConcurrency < 1 {
c.snapshotConcurrency = 1
}
}
c.lastSnapshotDuration = dur
c.snapshotLatencies.add(time.Since(start))
c.mu.Unlock()
if !enabled {
@ -899,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}
return c.writeNewFiles(maxGeneration, maxSequence, tsm)
return c.writeNewFiles(maxGeneration, maxSequence, tsm, true)
}
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
@ -980,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) {
// These are the new TSM files written
var files []string
@ -990,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension))
// Write as much as possible to this file
err := c.write(fileName, iter)
err := c.write(fileName, iter, throttle)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
@ -1029,17 +1030,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
return files, nil
}
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}
// Create the write for the new TSM file.
var w TSMWriter
var (
w TSMWriter
limitWriter io.Writer = fd
)
var limitWriter io.Writer = fd
if c.RateLimit != nil {
if c.RateLimit != nil && throttle {
limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit)
}
@ -1549,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
}
func (c *cacheKeyIterator) EstimatedIndexSize() int {
// We return 0 here since we already have all the entries in memory to write an index.
return 0
var n int
for _, v := range c.order {
n += len(v)
}
return n
}
func (c *cacheKeyIterator) encode() {
@ -1724,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool {
}
return true
}
type latencies struct {
i int
values []time.Duration
}
func (l *latencies) add(t time.Duration) {
l.values[l.i%len(l.values)] = t
l.i++
}
func (l *latencies) avg() time.Duration {
var n int64
var sum time.Duration
for _, v := range l.values {
if v == 0 {
continue
}
sum += v
n++
}
if n > 0 {
return time.Duration(int64(sum) / n)
}
return time.Duration(0)
}

View File

@ -1469,6 +1469,22 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
@ -1479,7 +1495,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
@ -1545,55 +1561,6 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) {
}
}
func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[2], data[3]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
@ -1810,8 +1777,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -1823,16 +1789,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
if exp, got := len(expFiles2), len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles2 {
if got, exp := tsm[1][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
@ -1877,6 +1833,30 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "13-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "14-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "15-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "16-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
@ -1887,8 +1867,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
expFiles1 := data[0:8]
expFiles2 := data[8:16]
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -2567,25 +2547,41 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
Size: 2148728539,
},
tsm1.FileStat{
Path: "000000005-000000002.tsm",
Size: 701863692,
Path: "000000005-000000001.tsm",
Size: 2148340232,
},
tsm1.FileStat{
Path: "000000006-000000002.tsm",
Size: 701863692,
Path: "000000006-000000001.tsm",
Size: 2148356556,
},
tsm1.FileStat{
Path: "000000007-000000002.tsm",
Size: 701863692,
Path: "000000007-000000001.tsm",
Size: 167780181,
},
tsm1.FileStat{
Path: "000000008-000000002.tsm",
Size: 701863692,
Path: "000000008-000000001.tsm",
Size: 2148728539,
},
tsm1.FileStat{
Path: "000000009-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000010-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000011-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000012-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000013-000000002.tsm",
Size: 701863692,
},
}
},
}, tsdb.DefaultCompactFullWriteColdDuration,
@ -2623,7 +2619,7 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(tsm[0]), 9; got != exp {
if got, exp := len(tsm[0]), 13; got != exp {
t.Fatalf("plan length mismatch: got %v, exp %v", got, exp)
}
cp.Release(tsm)

View File

@ -1695,7 +1695,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
}
func (e *Engine) compact(quit <-chan struct{}) {
t := time.NewTicker(10 * time.Second)
t := time.NewTicker(time.Second)
defer t.Stop()
for {
@ -1757,15 +1757,15 @@ func (e *Engine) compact(quit <-chan struct{}) {
switch level {
case 1:
if e.compactHiPriorityLevel(level1Groups[0], 1) {
if e.compactHiPriorityLevel(level1Groups[0], 1, false) {
level1Groups = level1Groups[1:]
}
case 2:
if e.compactHiPriorityLevel(level2Groups[0], 2) {
if e.compactHiPriorityLevel(level2Groups[0], 2, false) {
level2Groups = level2Groups[1:]
}
case 3:
if e.compactLoPriorityLevel(level3Groups[0], 3) {
if e.compactLoPriorityLevel(level3Groups[0], 3, true) {
level3Groups = level3Groups[1:]
}
case 4:
@ -1786,8 +1786,8 @@ func (e *Engine) compact(quit <-chan struct{}) {
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// true if the compaction was started
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool {
s := e.levelCompactionStrategy(grp, true, level)
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
}
@ -1815,8 +1815,8 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool {
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool {
s := e.levelCompactionStrategy(grp, true, level)
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
}