fix: PlanOptimize is running too frequently (#26211)

PlanOptimize is being checked far too frequently. This PR is the simplest change that can be made in order to ensure that PlanOptimize is not being ran too much. To alleviate the frequency I've added a lastWrite parameter to PlanOptimize and added an additional test that mocks the edge cause out in the wild that led to this PR.

Previously in test cases for PlanOptimize I was not checked to see if certain cases would be picked up by Plan I've adjusted a few of the existing test cases after modifying Plan and PlanOptimize to have the same lastWrite time.
pull/21664/merge
WeblWabl 2025-04-08 12:22:29 -05:00 committed by GitHub
parent 61f21c5adb
commit 96e44cac73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 641 additions and 243 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -133,6 +134,22 @@ type shardCompactor struct {
newTSM []string
}
// These methods are not used in production, need to implement in
// order to satisfy the FileStore interface, see: https://github.com/influxdata/influxdb/pull/26211
func (sc *shardCompactor) Stats() []tsm1.FileStat {
return nil
}
func (sc *shardCompactor) LastModified() time.Time {
return time.Now()
}
func (sc *shardCompactor) BlockCount(path string, idx int) int { return 0 }
func (sc *shardCompactor) ParseFileName(path string) (int, int, error) {
return 0, 0, errors.New("not implemented")
}
func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) {
sc = &shardCompactor{
logger: logger,
@ -227,11 +244,10 @@ func (sc *shardCompactor) openFiles() error {
func (sc *shardCompactor) CompactShard() (err error) {
c := tsm1.NewCompactor()
c.Dir = sc.path
c.Size = tsm1.DefaultSegmentSize
c.FileStore = sc
c.Open()
tsmFiles, err := c.CompactFull(sc.tsm, sc.logger)
tsmFiles, err := c.CompactFull(sc.tsm, sc.logger, tsdb.DefaultAggressiveMaxPointsPerBlock)
if err == nil {
sc.newTSM, err = sc.replace(tsmFiles)
}

View File

@ -115,7 +115,7 @@ type CompactionPlanner interface {
// This value is mostly ignored in normal compaction code paths, but,
// for the edge case where there is a single generation with many
// files under 2 GB this value is an important indicator.
PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64)
PlanOptimize(lastWrite time.Time) (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)
@ -172,6 +172,8 @@ type fileStore interface {
LastModified() time.Time
BlockCount(path string, idx int) int
ParseFileName(path string) (int, int, error)
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
}
func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
@ -253,9 +255,7 @@ func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}
// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() (bool, string) {
gens := c.findGenerations(false)
func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) {
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
} else if gens.hasTombstones() {
@ -286,6 +286,12 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
}
}
// FullyCompacted returns true if the shard is fully compacted.
// Used to check if an optimization can occur and shard hot-ness.
func (c *DefaultPlanner) FullyCompacted() (bool, string) {
return c.generationsFullyCompacted(c.findGenerations(false))
}
// ForceFull causes the planner to return a full compaction plan the next time
// a plan is requested. When ForceFull is called, level and optimize plans will
// not return plans until a full plan is requested and released.
@ -392,7 +398,7 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) {
func (c *DefaultPlanner) PlanOptimize(lastWrite time.Time) (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
@ -406,9 +412,9 @@ func (c *DefaultPlanner) PlanOptimize() (compactGroup []CompactionGroup, compact
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
generations := c.findGenerations(true)
fullyCompacted, _ := c.FullyCompacted()
fullyCompacted, _ := c.generationsFullyCompacted(generations)
if fullyCompacted {
if fullyCompacted || time.Since(lastWrite) < c.compactFullWriteColdDuration {
return nil, 0, 0
}
@ -758,13 +764,9 @@ func (c *DefaultPlanner) Release(groups []CompactionGroup) {
// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files.
type Compactor struct {
Dir string
Size int
Dir string
FileStore interface {
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
}
FileStore fileStore
// RateLimit is the limit for disk writes for all concurrent compactions.
RateLimit limiter.Rate
@ -955,15 +957,12 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e
}
// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, pointsPerBlock int) ([]string, error) {
// Sets the points per block size. The larger this value is set
// the more points there will be in a single index. Under normal
// conditions this should always be 1000 but there is an edge case
// where this is increased.
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
}
size := pointsPerBlock
c.mu.RLock()
intC := c.compactionsInterrupt
@ -1026,7 +1025,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([
}
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string, error) {
func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger, pointsPerBlock int) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
c.mu.RUnlock()
@ -1040,7 +1039,7 @@ func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string
}
defer c.remove(tsmFiles)
files, err := c.compact(false, tsmFiles, logger)
files, err := c.compact(false, tsmFiles, logger, pointsPerBlock)
// See if we were disabled while writing a snapshot
c.mu.RLock()
@ -1058,7 +1057,7 @@ func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string
}
// CompactFast writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger) ([]string, error) {
func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPerBlock int) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
c.mu.RUnlock()
@ -1072,7 +1071,7 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger) ([]string
}
defer c.remove(tsmFiles)
files, err := c.compact(true, tsmFiles, logger)
files, err := c.compact(true, tsmFiles, logger, pointsPerBlock)
// See if we were disabled while writing a snapshot
c.mu.RLock()

View File

@ -124,7 +124,7 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) {
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %#v", err)
}
@ -180,7 +180,7 @@ func TestCompactor_CompactFull(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
@ -191,7 +191,7 @@ func TestCompactor_CompactFull(t *testing.T) {
compactor.Open()
files, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -290,13 +290,13 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
require.Error(t, err, "expected error writing snapshot")
require.Zero(t, len(files), "no files should be compacted")
compactor.Open()
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string) error {
@ -335,11 +335,10 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f3}, zap.NewNop())
files, err := compactor.CompactFast([]string{f1, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -415,11 +414,10 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -483,11 +481,10 @@ func TestCompactor_Compact_UnsortedBlocks(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2}, zap.NewNop())
files, err := compactor.CompactFast([]string{f1, f2}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -557,11 +554,10 @@ func TestCompactor_Compact_UnsortedBlocksOverlapping(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -629,10 +625,9 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -731,10 +726,9 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -834,10 +828,9 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -942,10 +935,9 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop(), 2)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -1061,7 +1053,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
compactor.Open()
// Compact both files, should get 2 files back
files, err := compactor.CompactFull([]string{f1Name, f2Name}, zap.NewNop())
files, err := compactor.CompactFull([]string{f1Name, f2Name}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -1135,7 +1127,7 @@ func TestCompactor_CompactFull_InProgress(t *testing.T) {
defer func() {
assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName)
}()
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop())
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop(), tsdb.DefaultMaxPointsPerBlock)
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
assert.ErrorContainsf(t, err, "file exists", "unexpected error writing snapshot for %s", f2Name)
assert.Truef(t, errors.Is(err, fs.ErrExist), "error did not indicate file existence: %v", err)
@ -2258,7 +2250,7 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) {
)
expFiles := []tsm1.FileStat{}
tsm, pLen, gLen := cp.PlanOptimize()
tsm, pLen, gLen := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
@ -2323,7 +2315,19 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 512 * 1024 * 1024,
},
},
[]int{},
// Randomization of block sizes compared to file size
[]int{
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
100,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
100,
10,
5,
},
"not fully compacted and not idle because of more than one generation",
3,
},
@ -2463,144 +2467,13 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 450 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
tsdb.SingleGenerationReasonText,
1,
},
{
// Last files are lower than first files generations
// Mix of less than 4 level files and > level 4 files
"Generations with files under level 4",
[]tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 600 * 1024 * 1024,
},
{
Path: "03-06.tsm1",
Size: 500 * 1024 * 1024,
},
}, []int{}, "not fully compacted and not idle because of more than one generation", 3,
},
{
// This test will mock a 'backfill' condition where we have a single
// shard with many generations. The initial generation should be fully
// compacted, but we have some new generations that are not. We need to ensure
// the optimize planner will pick these up and compact everything together.
"Backfill mock condition",
[]tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-04.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 400 * 1024 * 1024,
},
{
Path: "03-02.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 400 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "04-02.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 400 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
// Use some magic numbers but these are just small values for block counts
100,
10,
},
"not fully compacted and not idle because of more than one generation",
4,
},
}
expectedNotFullyCompacted := func(cp *tsm1.DefaultPlanner, reasonExp string, generationCountExp int64) {
@ -2615,11 +2488,11 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
_, cgLen = cp.PlanLevel(3)
require.Zero(t, cgLen, "compaction group length; PlanLevel(3)")
tsmP, pLenP := cp.Plan(time.Now().Add(-time.Second))
tsmP, pLenP := cp.Plan(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
require.Zero(t, len(tsmP), "compaction group; Plan()")
require.Zero(t, pLenP, "compaction group length; Plan()")
_, cgLen, genLen := cp.PlanOptimize()
_, cgLen, genLen := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
require.Equal(t, int64(1), cgLen, "compaction group length")
require.Equal(t, generationCountExp, genLen, "generation count")
@ -2640,17 +2513,6 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
expectedNotFullyCompacted(cp, test.expectedFullyCompactedReasonExp, test.expectedgenerationCount)
// Reverse test files and re-run tests
slices.Reverse(test.fs)
if len(test.bc) > 0 {
slices.Reverse(test.bc)
err := ffs.SetBlockCounts(test.bc)
require.NoError(t, err, "setting reverse block counts")
}
cp = tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
expectedNotFullyCompacted(cp, test.expectedFullyCompactedReasonExp, test.expectedgenerationCount)
})
}
@ -2684,9 +2546,9 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 691 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0,
},
{
// This test is added to account for a single generation that has a group size
@ -2704,9 +2566,9 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 691 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
"",
0,
},
@ -2728,10 +2590,10 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 450 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0,
},
}
@ -2751,7 +2613,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
require.Zero(t, len(tsmP), "compaction group; Plan()")
require.Zero(t, pLenP, "compaction group length; Plan()")
cgroup, cgLen, genLen := cp.PlanOptimize()
cgroup, cgLen, genLen := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
require.Equal(t, []tsm1.CompactionGroup(nil), cgroup, "compaction group")
require.Zero(t, cgLen, "compaction group length")
require.Zero(t, genLen, "generation count")
@ -2825,9 +2687,9 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 691 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0, true,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, "", 0, true,
},
{
// This test is added to account for a single generation that has a group size
@ -2845,9 +2707,9 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 691 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
},
"",
0, true,
},
@ -2869,10 +2731,10 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
Size: 450 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, tsdb.SingleGenerationReasonText, 1, false,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
}, tsdb.SingleGenerationReasonText, 1, false,
},
}
@ -2924,6 +2786,508 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
mixedPlanOptimizeTestRunner(cp, test.expectedFullyCompactedReasonExp, test.fullyCompacted)
})
}
// The following tests ensure that if Plan() is scheduled
// for a shard than PlanOptimize() should not be scheduled for that shard
type PlanBeforePlanOptimizeTests struct {
name string
fs []tsm1.FileStat
bc []int
}
planBeforePlanOptimized := []PlanBeforePlanOptimizeTests{
{
"Generations with files under level 4",
[]tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 600 * 1024 * 1024,
},
{
Path: "03-06.tsm1",
Size: 500 * 1024 * 1024,
},
}, []int{},
},
{
// This test will mock a 'backfill' condition where we have a single
// shard with many generations. The initial generation should be fully
// compacted, but we have some new generations that are not. We need to ensure
// the optimize planner will pick these up and compact everything together.
"Backfill mock condition",
[]tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-04.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 400 * 1024 * 1024,
},
{
Path: "03-02.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 400 * 1024 * 1024,
},
{
Path: "04-01.tsm1",
Size: 700 * 1024 * 1024,
},
{
Path: "04-02.tsm1",
Size: 500 * 1024 * 1024,
},
{
Path: "03-03.tsm1",
Size: 400 * 1024 * 1024,
},
}, []int{
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultAggressiveMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
tsdb.DefaultMaxPointsPerBlock,
// Use some magic numbers but these are just small values for block counts
100,
10,
},
},
{
"1.12.0 RC0 Planner issue mock data from cluster",
[]tsm1.FileStat{
{
Path: "000029202-000000004.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000005.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000006.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000007.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000008.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000009.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000010.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000011.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000012.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000013.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000014.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000015.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000016.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000017.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000018.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000019.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000020.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000021.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000022.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000023.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000024.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000025.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000026.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000027.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000028.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000029.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000030.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000031.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000032.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000033.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000034.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000035.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000036.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000037.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000038.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000039.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000040.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000041.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000042.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000043.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000044.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000045.tsm",
Size: 2254857830,
},
{
Path: "000029202-000000046.tsm",
Size: 161480704,
},
{
Path: "000029235-000000003.tsm",
Size: 96468992,
},
{
Path: "000029267-000000003.tsm",
Size: 109051904,
},
{
Path: "000029268-000000001.tsm",
Size: 3040870,
},
{
Path: "000029268-000000002.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000003.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000004.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000005.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000006.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000007.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000008.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000009.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000010.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000011.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000012.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000013.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000014.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000015.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000016.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000017.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000018.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000019.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000020.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000021.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000022.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000023.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000024.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000025.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000026.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000027.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000028.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000029.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000030.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000031.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000032.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000033.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000034.tsm",
Size: 2254857830,
},
{
Path: "000029268-000000035.tsm",
Size: 1717986918,
},
}, []int{},
},
}
planBeforePlanOptimizedRunner := func(cp *tsm1.DefaultPlanner) {
// Ensure that no level planning takes place
_, cgLen := cp.PlanLevel(1)
require.Zero(t, cgLen, "compaction group length; PlanLevel(1)")
_, cgLen = cp.PlanLevel(2)
require.Zero(t, cgLen, "compaction group length; PlanLevel(2)")
_, cgLen = cp.PlanLevel(3)
require.Zero(t, cgLen, "compaction group length; PlanLevel(3)")
// Plan should schedule
tsmP, pLenP := cp.Plan(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
require.Equal(t, 1, len(tsmP), "compaction group; Plan()")
require.Equal(t, int64(1), pLenP, "compaction group length; Plan()")
// PlanOptimize should not schedule
cgroup, cgLen, genLen := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
require.Equal(t, []tsm1.CompactionGroup(nil), cgroup, "compaction group")
require.Zero(t, cgLen, "compaction group length")
require.Zero(t, genLen, "generation count")
}
for _, test := range planBeforePlanOptimized {
t.Run(test.name, func(t *testing.T) {
ffs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return test.fs
},
}
if len(test.bc) > 0 {
err := ffs.SetBlockCounts(test.bc)
require.NoError(t, err, "setting block counts")
}
cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
planBeforePlanOptimizedRunner(cp)
})
}
}
func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
@ -2952,7 +3316,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
tsm, pLen, _ := cp.PlanOptimize()
tsm, pLen, _ := cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
@ -3140,7 +3504,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
cp.Release(plan)
plan, pLen, _ = cp.PlanOptimize()
plan, pLen, _ = cp.PlanOptimize(time.Now().Add(-tsdb.DefaultCompactFullWriteColdDuration + 1))
// ensure the optimize planner would pick this up
if exp, got := 1, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)

View File

@ -2134,7 +2134,12 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
// If no full compactions are need, see if an optimize is needed
var genLen int64
if len(level4Groups) == 0 {
level4Groups, len4, genLen = e.CompactionPlan.PlanOptimize()
level4Groups, len4, genLen = e.CompactionPlan.PlanOptimize(e.LastModified())
if len(level4Groups) > 0 {
for _, group := range level4Groups {
e.logger.Info("TSM scheduled for optimized compaction", zap.Strings("files", group))
}
}
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4)
}
@ -2157,18 +2162,19 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
if level, runnable := e.scheduler.next(); runnable {
switch level {
case 1:
if e.compactHiPriorityLevel(level1Groups[0], 1, false, wg) {
if e.compactHiPriorityLevel(level1Groups[0], 1, false, tsdb.DefaultMaxPointsPerBlock, wg) {
level1Groups = level1Groups[1:]
}
case 2:
if e.compactHiPriorityLevel(level2Groups[0], 2, false, wg) {
if e.compactHiPriorityLevel(level2Groups[0], 2, false, tsdb.DefaultMaxPointsPerBlock, wg) {
level2Groups = level2Groups[1:]
}
case 3:
if e.compactLoPriorityLevel(level3Groups[0], 3, true, wg) {
if e.compactLoPriorityLevel(level3Groups[0], 3, true, tsdb.DefaultMaxPointsPerBlock, wg) {
level3Groups = level3Groups[1:]
}
case 4:
var pointsPerBlock int
// This is a heuristic. The 10_000 points per block default is suitable for when we have a
// single generation with multiple files at max block size under 2 GB.
if genLen == 1 {
@ -2176,11 +2182,20 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
for _, f := range level4Groups[0] {
e.logger.Info("TSM optimized compaction on single generation running, increasing total points per block.", zap.String("path", f), zap.Int("points-per-block", e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()))
}
e.Compactor.Size = e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()
pointsPerBlock = e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()
} else {
e.Compactor.Size = tsdb.DefaultMaxPointsPerBlock
pointsPerBlock = tsdb.DefaultMaxPointsPerBlock
for _, group := range level4Groups {
for _, f := range group {
if tsmPointsPerBlock := e.Compactor.FileStore.BlockCount(f, 1); tsmPointsPerBlock >= e.CompactionPlan.GetAggressiveCompactionPointsPerBlock() {
pointsPerBlock = e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()
e.logger.Info("TSM compaction on shard with increased points per block.", zap.String("path", f), zap.Int("points-per-block", e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()))
break
}
}
}
}
if e.compactFull(level4Groups[0], wg) {
if e.compactFull(level4Groups[0], pointsPerBlock, wg) {
level4Groups = level4Groups[1:]
}
}
@ -2197,7 +2212,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// true if the compaction was started
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool, pointsPerBlock int, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
@ -2213,7 +2228,7 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionLimiter.Release()
s.Apply()
s.Apply(pointsPerBlock)
// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
}()
@ -2226,7 +2241,7 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool, pointsPerBlock int, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
@ -2240,7 +2255,7 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionLimiter.Release()
s.Apply()
s.Apply(pointsPerBlock)
// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
}()
@ -2251,7 +2266,7 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo
// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
// the plans that were not able to be started.
func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
func (e *Engine) compactFull(grp CompactionGroup, pointsPerBlock int, wg *sync.WaitGroup) bool {
s := e.fullCompactionStrategy(grp, false)
if s == nil {
return false
@ -2265,7 +2280,7 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1)
defer e.compactionLimiter.Release()
s.Apply()
s.Apply(pointsPerBlock)
// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
}()
@ -2294,14 +2309,14 @@ type compactionStrategy struct {
}
// Apply concurrently compacts all the groups in a compaction strategy.
func (s *compactionStrategy) Apply() {
func (s *compactionStrategy) Apply(pointsPerBlock int) {
start := time.Now()
s.compactGroup()
s.compactGroup(pointsPerBlock)
atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds())
}
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup() {
func (s *compactionStrategy) compactGroup(pointsPerBlock int) {
group := s.group
log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1_compact_group", logger.Shard(s.engine.id))
defer logEnd()
@ -2316,9 +2331,9 @@ func (s *compactionStrategy) compactGroup() {
files []string
)
if s.fast {
files, err = s.compactor.CompactFast(group, log)
files, err = s.compactor.CompactFast(group, log, pointsPerBlock)
} else {
files, err = s.compactor.CompactFull(group, log)
files, err = s.compactor.CompactFull(group, log, pointsPerBlock)
}
if err != nil {

View File

@ -2893,11 +2893,15 @@ func (m *mockPlanner) SetAggressiveCompactionPointsPerBlock(aggressiveCompaction
}
func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64, int64) { return nil, 0, 0 }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" }
func (m *mockPlanner) ForceFull() {}
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}
func (m *mockPlanner) PlanOptimize(lastWrite time.Time) ([]tsm1.CompactionGroup, int64, int64) {
return nil, 0, 0
}
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() (bool, string) {
return false, "not compacted"
}
func (m *mockPlanner) ForceFull() {}
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
func ParseTags(s string) query.Tags {