feat: Add CompactPointsPerBlock config opt (#26100)
* feat: Add CompactPointsPerBlock config opt This PR adds an additional parameter for influxd CompactPointsPerBlock. It adjusts the DefaultAggressiveMaxPointsPerBlock to 10,000. We had discovered that with the points per block set to 100,000 compacted TSM files were increasing. After modifying the points per block to 10,000 we noticed that the file sizes decreased. The value has been set as a parameter that can be adjusted by administrators this allows there to be some tuning if compression problems are encountered.pull/26143/head
parent
2ab5aad52e
commit
d8bcbd894c
|
@ -124,6 +124,11 @@
|
|||
# will allow TSM compactions to write to disk.
|
||||
# compact-throughput-burst = "48m"
|
||||
|
||||
# AggressivePointsPerBlock is the points per block to be used when aggressive
|
||||
# compaction is used. There are certain cases where TSM files do not get
|
||||
# fully compacted. This adjusts an internal parameter that alleviates that.
|
||||
# aggressive-points-per-block = 10000
|
||||
|
||||
# If true, then the mmap advise value MADV_WILLNEED will be provided to the kernel with respect to
|
||||
# TSM files. This setting has been found to be problematic on some kernels, and defaults to off.
|
||||
# It might help users who have slow disks in some cases.
|
||||
|
|
|
@ -52,9 +52,9 @@ const (
|
|||
// block in a TSM file
|
||||
DefaultMaxPointsPerBlock = 1000
|
||||
|
||||
// AggressiveMaxPointsPerBlock is used when we want to further compact blocks
|
||||
// it is 100 times the default amount of points we use per block
|
||||
AggressiveMaxPointsPerBlock = DefaultMaxPointsPerBlock * 100
|
||||
// DefaultAggressiveMaxPointsPerBlock is used when we want to further compact blocks
|
||||
// it is 10 times the default amount of points we use per block
|
||||
DefaultAggressiveMaxPointsPerBlock = DefaultMaxPointsPerBlock * 10
|
||||
|
||||
// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
|
||||
// This limit only applies to the "inmem" index.
|
||||
|
@ -92,7 +92,7 @@ var SingleGenerationReasonText string = SingleGenerationReason()
|
|||
// when checked for full compaction.
|
||||
// 1048576000 is a magic number for bytes per gigabyte.
|
||||
func SingleGenerationReason() string {
|
||||
return fmt.Sprintf("not fully compacted and not idle because single generation with more than 2 files under %d GB and more than 1 file(s) under aggressive compaction points per block count (%d points)", int(MaxTSMFileSize/1048576000), AggressiveMaxPointsPerBlock)
|
||||
return fmt.Sprintf("not fully compacted and not idle because single generation with more than 2 files under %d GB and more than 1 file(s) under aggressive compaction points per block count (default: %d points)", int(MaxTSMFileSize/1048576000), DefaultAggressiveMaxPointsPerBlock)
|
||||
}
|
||||
|
||||
// Config holds the configuration for the tsbd package.
|
||||
|
@ -128,6 +128,7 @@ type Config struct {
|
|||
CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"`
|
||||
CompactThroughput toml.Size `toml:"compact-throughput"`
|
||||
CompactThroughputBurst toml.Size `toml:"compact-throughput-burst"`
|
||||
AggressivePointsPerBlock toml.Size `toml:"aggressive-points-per-block"`
|
||||
|
||||
// Options for ingress metrics
|
||||
IngressMetricByMeasurement bool `toml:"ingress-metric-by-measurement-enabled"`
|
||||
|
@ -197,6 +198,7 @@ func NewConfig() Config {
|
|||
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
|
||||
CompactThroughput: toml.Size(DefaultCompactThroughput),
|
||||
CompactThroughputBurst: toml.Size(DefaultCompactThroughputBurst),
|
||||
AggressivePointsPerBlock: toml.Size(DefaultAggressiveMaxPointsPerBlock),
|
||||
|
||||
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
|
||||
MaxValuesPerTag: DefaultMaxValuesPerTag,
|
||||
|
|
|
@ -124,6 +124,10 @@ type CompactionPlanner interface {
|
|||
ForceFull()
|
||||
|
||||
SetFileStore(fs *FileStore)
|
||||
|
||||
SetAggressiveCompactionPointsPerBlock(aggressiveCompactionPointsPerBlock int)
|
||||
|
||||
GetAggressiveCompactionPointsPerBlock() int
|
||||
}
|
||||
|
||||
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
|
||||
|
@ -157,6 +161,10 @@ type DefaultPlanner struct {
|
|||
// filesInUse is the set of files that have been returned as part of a plan and might
|
||||
// be being compacted. Two plans should not return the same file at any given time.
|
||||
filesInUse map[string]struct{}
|
||||
|
||||
// aggressiveCompactionPointsPerBlock is the amount of points that should be
|
||||
// packed in to a TSM file block during aggressive compaction
|
||||
aggressiveCompactionPointsPerBlock int
|
||||
}
|
||||
|
||||
type fileStore interface {
|
||||
|
@ -168,9 +176,10 @@ type fileStore interface {
|
|||
|
||||
func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
|
||||
return &DefaultPlanner{
|
||||
FileStore: fs,
|
||||
compactFullWriteColdDuration: writeColdDuration,
|
||||
filesInUse: make(map[string]struct{}),
|
||||
FileStore: fs,
|
||||
compactFullWriteColdDuration: writeColdDuration,
|
||||
filesInUse: make(map[string]struct{}),
|
||||
aggressiveCompactionPointsPerBlock: tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -228,6 +237,14 @@ func (t *tsmGeneration) hasTombstones() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *DefaultPlanner) SetAggressiveCompactionPointsPerBlock(aggressiveCompactionPointsPerBlock int) {
|
||||
c.aggressiveCompactionPointsPerBlock = aggressiveCompactionPointsPerBlock
|
||||
}
|
||||
|
||||
func (c *DefaultPlanner) GetAggressiveCompactionPointsPerBlock() int {
|
||||
return c.aggressiveCompactionPointsPerBlock
|
||||
}
|
||||
|
||||
func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
|
||||
c.FileStore = fs
|
||||
}
|
||||
|
@ -253,7 +270,7 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
|
|||
aggressivePointsPerBlockCount := 0
|
||||
filesUnderMaxTsmSizeCount := 0
|
||||
for _, tsmFile := range gens[0].files {
|
||||
if c.FileStore.BlockCount(tsmFile.Path, 1) >= tsdb.AggressiveMaxPointsPerBlock {
|
||||
if c.FileStore.BlockCount(tsmFile.Path, 1) >= c.aggressiveCompactionPointsPerBlock {
|
||||
aggressivePointsPerBlockCount++
|
||||
}
|
||||
if tsmFile.Size < tsdb.MaxTSMFileSize {
|
||||
|
|
|
@ -2399,7 +2399,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
// > 2 GB total group size
|
||||
// 50% of files are at aggressive max block size
|
||||
{
|
||||
"Small group size with single generation 50% at DefaultMaxPointsPerBlock and 50% at AggressiveMaxPointsPerBlock",
|
||||
"Small group size with single generation 50% at DefaultMaxPointsPerBlock and 50% at DefaultAggressiveMaxPointsPerBlock",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-05.tsm1",
|
||||
|
@ -2435,10 +2435,10 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
},
|
||||
},
|
||||
[]int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
|
@ -2463,7 +2463,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
Size: 450 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
},
|
||||
|
@ -2581,16 +2581,16 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
Size: 400 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
|
@ -2673,7 +2673,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
// This test is added to account for a single generation that has a group size
|
||||
// over 2 GB with 1 file under 2 GB all at max points per block with aggressive compaction.
|
||||
// It should not compact any further.
|
||||
"TSM files at AggressiveMaxPointsPerBlock",
|
||||
"TSM files at DefaultAggressiveMaxPointsPerBlock",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-13.tsm1",
|
||||
|
@ -2684,8 +2684,8 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
Size: 691 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
}, "", 0,
|
||||
},
|
||||
{
|
||||
|
@ -2704,7 +2704,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
Size: 691 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
},
|
||||
"",
|
||||
|
@ -2713,7 +2713,7 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
{
|
||||
// This test is added to account for a single generation that has a group size
|
||||
// over 2 GB and multiple files under 2 GB all at max points per block for aggressive compaction.
|
||||
"Group size over 2 with multiple files under 2GB and at AggressiveMaxPointsPerBlock",
|
||||
"Group size over 2 with multiple files under 2GB and at DefaultAggressiveMaxPointsPerBlock",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-13.tsm1",
|
||||
|
@ -2728,9 +2728,9 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
Size: 450 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.AggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
}, "", 0,
|
||||
},
|
||||
}
|
||||
|
@ -2785,6 +2785,145 @@ func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) {
|
|||
expectedFullyCompacted(cp, test.expectedFullyCompactedReasonExp)
|
||||
})
|
||||
}
|
||||
|
||||
type PlanOptimizeMixedTests struct {
|
||||
name string
|
||||
fs []tsm1.FileStat
|
||||
bc []int
|
||||
expectedFullyCompactedReasonExp string
|
||||
expectedgenerationCount int64
|
||||
fullyCompacted bool
|
||||
}
|
||||
|
||||
mixedPlanOptimizeTests := []PlanOptimizeMixedTests{
|
||||
{
|
||||
// This test is added to account for halting state after
|
||||
// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration
|
||||
// will need to ensure that once we have single TSM file under 2 GB we stop
|
||||
"Single TSM file with increase block count",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-09.tsm1",
|
||||
Size: 650 * 1024 * 1024,
|
||||
},
|
||||
},
|
||||
[]int{},
|
||||
"", 0, true,
|
||||
},
|
||||
{
|
||||
// This test is added to account for a single generation that has a group size
|
||||
// over 2 GB with 1 file under 2 GB all at max points per block with aggressive compaction.
|
||||
// It should not compact any further.
|
||||
"TSM files at DefaultAggressiveMaxPointsPerBlock with increased block count",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-13.tsm1",
|
||||
Size: 2048 * 1024 * 1024,
|
||||
},
|
||||
{
|
||||
Path: "01-14.tsm1",
|
||||
Size: 691 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
}, "", 0, true,
|
||||
},
|
||||
{
|
||||
// This test is added to account for a single generation that has a group size
|
||||
// over 2 GB at max points per block with aggressive compaction, and, 1 file
|
||||
// under 2 GB at default max points per block.
|
||||
// It should not compact any further.
|
||||
"TSM files cannot compact further, single file under 2G and at DefaultMaxPointsPerBlock with increased block count",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-13.tsm1",
|
||||
Size: 2048 * 1024 * 1024,
|
||||
},
|
||||
{
|
||||
Path: "01-14.tsm1",
|
||||
Size: 691 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultMaxPointsPerBlock,
|
||||
},
|
||||
"",
|
||||
0, true,
|
||||
},
|
||||
{
|
||||
// This test is added to account for a single generation that has a group size
|
||||
// over 2 GB and multiple files under 2 GB all at max points per block for aggressive compaction.
|
||||
"Group size over 2 with multiple files under 2GB and at DefaultAggressiveMaxPointsPerBlock with increased block count",
|
||||
[]tsm1.FileStat{
|
||||
{
|
||||
Path: "01-13.tsm1",
|
||||
Size: 2048 * 1024 * 1024,
|
||||
},
|
||||
{
|
||||
Path: "01-14.tsm1",
|
||||
Size: 650 * 1024 * 1024,
|
||||
},
|
||||
{
|
||||
Path: "01-15.tsm1",
|
||||
Size: 450 * 1024 * 1024,
|
||||
},
|
||||
}, []int{
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
tsdb.DefaultAggressiveMaxPointsPerBlock,
|
||||
}, tsdb.SingleGenerationReasonText, 1, false,
|
||||
},
|
||||
}
|
||||
|
||||
mixedPlanOptimizeTestRunner := func(cp *tsm1.DefaultPlanner, reasonExp string, fullyCompacted bool) {
|
||||
compacted, reason := cp.FullyCompacted()
|
||||
require.Equal(t, reason, reasonExp, "fullyCompacted reason")
|
||||
require.Equal(t, compacted, fullyCompacted, "is fully compacted")
|
||||
|
||||
// Ensure that no level planning takes place
|
||||
_, cgLen := cp.PlanLevel(1)
|
||||
require.Zero(t, cgLen, "compaction group length; PlanLevel(1)")
|
||||
_, cgLen = cp.PlanLevel(2)
|
||||
require.Zero(t, cgLen, "compaction group length; PlanLevel(2)")
|
||||
_, cgLen = cp.PlanLevel(3)
|
||||
require.Zero(t, cgLen, "compaction group length; PlanLevel(3)")
|
||||
}
|
||||
|
||||
// These tests will decrease the max points per block for aggressive compaction.
|
||||
// For SetAggressiveCompactionPointsPerBlock we are using 10x the default to
|
||||
// mock an administrator setting the max points per block to 100_000 and overriding
|
||||
// the default of 10_000.
|
||||
for _, test := range mixedPlanOptimizeTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ffs := &fakeFileStore{
|
||||
PathsFn: func() []tsm1.FileStat {
|
||||
return test.fs
|
||||
},
|
||||
}
|
||||
|
||||
if len(test.bc) > 0 {
|
||||
err := ffs.SetBlockCounts(test.bc)
|
||||
require.NoError(t, err, "setting block counts")
|
||||
}
|
||||
|
||||
cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
|
||||
cp.SetAggressiveCompactionPointsPerBlock(tsdb.DefaultAggressiveMaxPointsPerBlock * 10)
|
||||
mixedPlanOptimizeTestRunner(cp, test.expectedFullyCompactedReasonExp, test.fullyCompacted)
|
||||
|
||||
// Reverse test files and re-run tests
|
||||
slices.Reverse(test.fs)
|
||||
if len(test.bc) > 0 {
|
||||
slices.Reverse(test.bc)
|
||||
err := ffs.SetBlockCounts(test.bc)
|
||||
require.NoError(t, err, "setting reverse block counts")
|
||||
}
|
||||
|
||||
cp = tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration)
|
||||
cp.SetAggressiveCompactionPointsPerBlock(tsdb.DefaultAggressiveMaxPointsPerBlock * 10)
|
||||
mixedPlanOptimizeTestRunner(cp, test.expectedFullyCompactedReasonExp, test.fullyCompacted)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
|
||||
|
|
|
@ -230,6 +230,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
|
|||
c.RateLimit = opt.CompactionThroughputLimiter
|
||||
|
||||
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
|
||||
planner.SetAggressiveCompactionPointsPerBlock(int(opt.Config.AggressivePointsPerBlock))
|
||||
|
||||
if opt.CompactionPlannerCreator != nil {
|
||||
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
|
||||
planner.SetFileStore(fs)
|
||||
|
@ -2167,14 +2169,14 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
|
|||
level3Groups = level3Groups[1:]
|
||||
}
|
||||
case 4:
|
||||
// This is a heuristic. 100_000 points per block is suitable for when we have a
|
||||
// This is a heuristic. The 10_000 points per block default is suitable for when we have a
|
||||
// single generation with multiple files at max block size under 2 GB.
|
||||
if genLen == 1 {
|
||||
// Log TSM files that will have an increased points per block count.
|
||||
for _, f := range level4Groups[0] {
|
||||
e.logger.Info("TSM optimized compaction on single generation running, increasing total points per block to 100_000.", zap.String("path", f))
|
||||
e.logger.Info("TSM optimized compaction on single generation running, increasing total points per block.", zap.String("path", f), zap.Int("points-per-block", e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()))
|
||||
}
|
||||
e.Compactor.Size = tsdb.AggressiveMaxPointsPerBlock
|
||||
e.Compactor.Size = e.CompactionPlan.GetAggressiveCompactionPointsPerBlock()
|
||||
} else {
|
||||
e.Compactor.Size = tsdb.DefaultMaxPointsPerBlock
|
||||
}
|
||||
|
|
|
@ -2887,6 +2887,10 @@ func MustParsePointString(buf string) models.Point { return MustParsePointsStrin
|
|||
|
||||
type mockPlanner struct{}
|
||||
|
||||
func (m *mockPlanner) GetAggressiveCompactionPointsPerBlock() int { return 0 }
|
||||
func (m *mockPlanner) SetAggressiveCompactionPointsPerBlock(aggressiveCompactionPointsPerBlock int) {
|
||||
return
|
||||
}
|
||||
func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
|
||||
func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 }
|
||||
func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64, int64) { return nil, 0, 0 }
|
||||
|
|
Loading…
Reference in New Issue