fix: avoid compaction queue stats flutter (#22235)

When the compaction planner runs, if it cannot acquire
a lock on the files it plans to compact, it returns a
nil list of compaction groups. This, in turn, sets the
engine statistics for compactions queues to zero,
which is incorrect. Instead, use the length of pending
files which would have been returned.

closes https://github.com/influxdata/influxdb/issues/22138

(cherry picked from commit 7d3efe1e9e)

closes https://github.com/influxdata/influxdb/issues/22141
pull/22257/head
davidby-influx 2021-08-17 14:03:54 -07:00 committed by GitHub
parent 991ff02ea7
commit 9923d2e8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 148 additions and 72 deletions

View File

@ -82,6 +82,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [#22174](https://github.com/influxdata/influxdb/pull/22174): systemd service -- handle 40x and block indefinitely
1. [#22228](https://github.com/influxdata/influxdb/pull/22228): influxdb2 packages should depend on curl
1. [#22211](https://github.com/influxdata/influxdb/pull/22211): Prevent scheduling an inactivated tasks after updating it
1. [#22235](https://github.com/influxdata/influxdb/pull/22235): Avoid compaction queue stats flutter
## v2.0.7 [2021-06-04]

View File

@ -89,9 +89,9 @@ type CompactionGroup []string
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)
@ -234,13 +234,13 @@ func (c *DefaultPlanner) ForceFull() {
}
// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()
@ -252,7 +252,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}
// Group each generation by level such that two adjacent generations in the same
@ -321,22 +321,22 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}
if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}
return cGroups
return cGroups, int64(len(cGroups))
}
// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()
@ -348,7 +348,7 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}
// Group each generation by level such that two adjacent generations in the same
@ -413,15 +413,15 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
}
if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}
return cGroups
return cGroups, int64(len(cGroups))
}
// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
// multiple groups if possible to allow compactions to run concurrently.
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
generations := c.findGenerations(true)
c.mu.RLock()
@ -471,19 +471,19 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// Make sure we have more than 1 file and more than 1 generation
if len(tsmFiles) <= 1 || genCount <= 1 {
return nil
return nil, 0
}
group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
return nil, int64(len(group))
}
return group
return group, int64(len(group))
}
// don't plan if nothing has changed in the filestore
if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
return nil
return nil, 0
}
c.lastPlanCheck = time.Now()
@ -491,7 +491,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// If there is only one generation, return early to avoid re-compacting the same file
// over and over again.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}
// Need to find the ending point for level 4 files. They will be the oldest files. We scan
@ -584,7 +584,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}
if len(groups) == 0 {
return nil
return nil, 0
}
// With the groups, we need to evaluate whether the group as a whole can be compacted
@ -612,9 +612,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}
if !c.acquire(tsmFiles) {
return nil
return nil, int64(len(tsmFiles))
}
return tsmFiles
return tsmFiles, int64(len(tsmFiles))
}
// findGenerations groups all the TSM files by generation based

View File

@ -1535,9 +1535,11 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -1584,9 +1586,11 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -1645,10 +1649,12 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) {
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3],
data[4], data[5], data[6], data[7]}
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if got, exp := len(tsm), 2; got != exp {
t.Fatalf("compaction group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if exp, got := len(expFiles[:4]), len(tsm[0]); got != exp {
@ -1670,7 +1676,6 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner grabs the smallest compaction step
@ -1735,9 +1740,11 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]}
tsm := cp.PlanLevel(1)
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -1788,9 +1795,11 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]}
tsm := cp.PlanLevel(3)
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -1841,9 +1850,11 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(3)
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -1884,9 +1895,11 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(3)
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -1916,9 +1929,11 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(2)
tsm, pLen := cp.PlanLevel(2)
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -1960,9 +1975,11 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1]}
tsm := cp.PlanLevel(3)
tsm, pLen := cp.PlanLevel(3)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -2018,9 +2035,11 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
tsm := cp.PlanLevel(1)
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles1 {
@ -2109,9 +2128,11 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
expFiles1 := data[0:8]
expFiles2 := data[8:16]
tsm := cp.PlanLevel(1)
tsm, pLen := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles1 {
@ -2132,9 +2153,11 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
cp.Release(tsm[1:])
tsm = cp.PlanLevel(1)
tsm, pLen = cp.PlanLevel(1)
if exp, got := len(expFiles2), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles2 {
@ -2169,9 +2192,11 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanOptimize()
tsm, pLen := cp.PlanOptimize()
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2216,9 +2241,11 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) {
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5]}
tsm := cp.PlanOptimize()
tsm, pLen := cp.PlanOptimize()
if exp, got := 1, len(tsm); exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -2287,9 +2314,11 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) {
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[6], data[7], data[8], data[9]}
tsm := cp.PlanOptimize()
tsm, pLen := cp.PlanOptimize()
if exp, got := 2, len(tsm); exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -2338,9 +2367,11 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanOptimize()
tsm, pLen := cp.PlanOptimize()
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2370,9 +2401,11 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
tsm := cp.PlanOptimize()
tsm, pLen := cp.PlanOptimize()
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -2422,9 +2455,11 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
time.Nanosecond,
)
tsm := cp.Plan(time.Now().Add(-time.Second))
tsm, pLen := cp.Plan(time.Now().Add(-time.Second))
if exp, got := len(data), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range data {
@ -2456,9 +2491,11 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2492,10 +2529,12 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond)
plan := cp.Plan(time.Now().Add(-time.Second))
plan, pLen := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
@ -2531,16 +2570,20 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
cp.FileStore = overFs
plan = cp.Plan(time.Now().Add(-time.Second))
plan, pLen = cp.Plan(time.Now().Add(-time.Second))
if exp, got := 0, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
plan = cp.PlanOptimize()
plan, pLen = cp.PlanOptimize()
// ensure the optimize planner would pick this up
if exp, got := 1, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
@ -2548,8 +2591,11 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
// ensure that it will plan if last modified has changed
fs.lastModified = time.Now()
if exp, got := 4, len(cp.Plan(time.Now())[0]); got != exp {
cGroups, pLen := cp.Plan(time.Now())
if exp, got := 4, len(cGroups[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(cGroups)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2609,9 +2655,11 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) {
},
time.Hour)
tsm := cp.Plan(time.Now().Add(-24 * time.Hour))
tsm, pLen := cp.Plan(time.Now().Add(-24 * time.Hour))
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2649,10 +2697,12 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
time.Nanosecond,
)
plan := cp.Plan(time.Now().Add(-time.Second))
plan, pLen := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(plan)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(plan)
@ -2676,8 +2726,11 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
}
cp.FileStore = overFs
if exp, got := 1, len(cp.Plan(time.Now().Add(-time.Second))); got != exp {
cGroups, pLen := cp.Plan(time.Now().Add(-time.Second))
if exp, got := 1, len(cGroups); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(cGroups)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2716,9 +2769,11 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
for i, p := range expFiles {
@ -2758,9 +2813,11 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
tsm, pLen := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
}
@ -2826,36 +2883,46 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.PlanLevel(1)
tsm, pLen := cp.PlanLevel(1)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm = cp.PlanLevel(2)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
cp.ForceFull()
// Level plans should not return any plans
tsm = cp.PlanLevel(1)
tsm, pLen = cp.PlanLevel(1)
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm = cp.PlanLevel(2)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm = cp.Plan(time.Now())
tsm, pLen = cp.Plan(time.Now())
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
if got, exp := len(tsm[0]), 13; got != exp {
@ -2864,15 +2931,19 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
cp.Release(tsm)
// Level plans should return plans now that Plan has been called
tsm = cp.PlanLevel(1)
tsm, pLen = cp.PlanLevel(1)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)
tsm = cp.PlanLevel(2)
tsm, pLen = cp.PlanLevel(2)
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}
cp.Release(tsm)

View File

@ -2013,24 +2013,28 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
case <-t.C:
// Find our compaction plans
level1Groups := e.CompactionPlan.PlanLevel(1)
level2Groups := e.CompactionPlan.PlanLevel(2)
level3Groups := e.CompactionPlan.PlanLevel(3)
level4Groups := e.CompactionPlan.Plan(e.LastModified())
atomic.StoreInt64(&e.stats.TSMFullCompactionsQueue, int64(len(level4Groups)))
level1Groups, len1 := e.CompactionPlan.PlanLevel(1)
level2Groups, len2 := e.CompactionPlan.PlanLevel(2)
level3Groups, len3 := e.CompactionPlan.PlanLevel(3)
level4Groups, len4 := e.CompactionPlan.Plan(e.LastModified())
atomic.StoreInt64(&e.stats.TSMFullCompactionsQueue, len4)
// If no full compactions are need, see if an optimize is needed
if len(level4Groups) == 0 {
level4Groups = e.CompactionPlan.PlanOptimize()
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))
level4Groups, len4 = e.CompactionPlan.PlanOptimize()
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4)
}
// Update the level plan queue stats
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], int64(len(level1Groups)))
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], int64(len(level2Groups)))
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], int64(len(level3Groups)))
// For stats, use the length needed, even if the lock was
// not acquired
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], len1)
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], len2)
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], len3)
// Set the queue depths on the scheduler
// Use the real queue depth, dependent on acquiring
// the file locks.
e.scheduler.setDepth(1, len(level1Groups))
e.scheduler.setDepth(2, len(level2Groups))
e.scheduler.setDepth(3, len(level3Groups))

View File

@ -2731,10 +2731,10 @@ func MustParsePointString(buf string) models.Point { return MustParsePointsStrin
type mockPlanner struct{}
func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64) { return nil, 0 }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" }
func (m *mockPlanner) ForceFull() {}
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}