fix: Separate L0 and Mix trigger interval (#37190)

See also: #37108

- Add MixCompactionTriggerInterval, default 60s
- Add L0CompactionTriggerInterval, default 10s
- Export Single related compaction configs
- Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/37561/head
XuanYang-cn 2024-11-12 10:56:37 +08:00 committed by GitHub
parent 2a4c00de9d
commit a45a288a25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 76 additions and 34 deletions

View File

@ -63,7 +63,7 @@ etcd:
password: # password for etcd authentication password: # password for etcd authentication
metastore: metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv] type: etcd # Default value: etcd, Valid values: [etcd, tikv]
snapshot: snapshot:
ttl: 86400 # snapshot ttl in seconds ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds reserveTime: 3600 # snapshot reserve time in seconds
@ -564,6 +564,23 @@ dataCoord:
maxParallelTaskNum: 10 maxParallelTaskNum: 10
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc gcInterval: 1800 # The time interval in seconds for compaction gc
mix:
triggerInterval: 60 # The time interval in seconds to trigger mix compaction
levelzero:
triggerInterval: 10 # The time interval in seconds for trigger L0 compaction
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering: clustering:
enable: true # Enable clustering compaction enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction autoEnable: false # Enable auto clustering compaction
@ -579,12 +596,6 @@ dataCoord:
minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train
maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train
maxClusterSize: 5g # maximum cluster size in Kmeans train maxClusterSize: 5g # maximum cluster size in Kmeans train
levelzero:
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
syncSegmentsInterval: 300 # The time interval for regularly syncing segments syncSegmentsInterval: 300 # The time interval for regularly syncing segments
enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service. enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service.
gc: gc:
@ -1077,4 +1088,4 @@ knowhere:
pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data) pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data)
search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data
search: # Diskann search params search: # Diskann search params
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number. beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number

View File

@ -109,7 +109,7 @@ func newCompactionTrigger(
} }
func (t *compactionTrigger) start() { func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2) t.closeWaiter.Add(2)
go func() { go func() {
defer logutil.LogPanic() defer logutil.LogPanic()
@ -132,8 +132,6 @@ func (t *compactionTrigger) start() {
default: default:
// no need to handle err in handleSignal // no need to handle err in handleSignal
t.handleSignal(signal) t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
} }
} }
} }

View File

@ -108,11 +108,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic() defer logutil.LogPanic()
defer m.closeWg.Done() defer m.closeWg.Done()
l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop() defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop() defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop() defer singleTicker.Stop()
log.Info("Compaction trigger manager start") log.Info("Compaction trigger manager start")
for { for {

View File

@ -3202,25 +3202,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"` CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"` CompactionTaskQueueCapacity ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"` CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"` CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"` CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"` MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"` MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"` SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"` SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"` SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"` CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"` SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"` ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"` SyncSegmentsInterval ParamItem `refreshable:"false"`
// Clustering Compaction // Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"` ClusteringCompactionEnable ParamItem `refreshable:"true"`
@ -3592,13 +3596,17 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold", Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: "0.2", DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
} }
p.SingleCompactionRatioThreshold.Init(base.mgr) p.SingleCompactionRatioThreshold.Init(base.mgr)
p.SingleCompactionDeltaLogMaxSize = ParamItem{ p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize", Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024), DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
} }
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr) p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)
@ -3606,6 +3614,8 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.expiredlog.maxsize", Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: "10485760", DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
} }
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)
@ -3613,6 +3623,8 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.deltalog.maxnum", Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: "200", DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
} }
p.SingleCompactionDeltalogMaxNum.Init(base.mgr) p.SingleCompactionDeltalogMaxNum.Init(base.mgr)
@ -3620,9 +3632,28 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.global.interval", Key: "dataCoord.compaction.global.interval",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: "60", DefaultValue: "60",
Doc: "deprecated",
} }
p.GlobalCompactionInterval.Init(base.mgr) p.GlobalCompactionInterval.Init(base.mgr)
p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds to trigger mix compaction",
DefaultValue: "60",
Export: true,
}
p.MixCompactionTriggerInterval.Init(base.mgr)
p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger L0 compaction",
DefaultValue: "10",
Export: true,
}
p.L0CompactionTriggerInterval.Init(base.mgr)
p.ChannelCheckpointMaxLag = ParamItem{ p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag", Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0", Version: "2.4.0",

View File

@ -465,7 +465,7 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
Key: "metastore.type", Key: "metastore.type",
Version: "2.2.0", Version: "2.2.0",
DefaultValue: util.MetaStoreTypeEtcd, DefaultValue: util.MetaStoreTypeEtcd,
Doc: `Default value: etcd, Valid values: [etcd, tikv] `, Doc: `Default value: etcd, Valid values: [etcd, tikv]`,
Export: true, Export: true,
} }
p.MetaStoreType.Init(base.mgr) p.MetaStoreType.Init(base.mgr)

View File

@ -33,13 +33,15 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite() s.MiniClusterSuite.SetupSuite()
paramtable.Init() paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
} }
func (s *CompactionSuite) TearDownSuite() { func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite() s.MiniClusterSuite.TearDownSuite()
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
} }
func TestCompaction(t *testing.T) { func TestCompaction(t *testing.T) {

View File

@ -273,9 +273,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) { func TestL2SingleCompaction(t *testing.T) {
paramtable.Init() paramtable.Init()
// to speed up the test // to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)
suite.Run(t, new(L2SingleCompactionSuite)) suite.Run(t, new(L2SingleCompactionSuite))