mirror of https://github.com/milvus-io/milvus.git
enhance: Rename config of sealing by growing segmetns size (#34787)
/kind improvement --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/34851/head
parent
7eaef09ba5
commit
b22e549844
|
@ -443,9 +443,6 @@ dataCoord:
|
|||
# The max number of binlog file for one segment, the segment will be sealed if
|
||||
# the number of binlog file reaches to max value.
|
||||
maxBinlogFileNumber: 32
|
||||
# The size threshold in MB, if the total size of growing segments
|
||||
# exceeds this threshold, the largest growing segment will be sealed.
|
||||
totalGrowingSizeThresholdInMB: 4096
|
||||
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
|
||||
# (smallProportion * segment max # of rows).
|
||||
# A compaction will happen on small segments if the segment after compaction will have
|
||||
|
@ -455,6 +452,11 @@ dataCoord:
|
|||
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
|
||||
expansionRate: 1.25
|
||||
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
|
||||
sealPolicy:
|
||||
channel:
|
||||
# The size threshold in MB, if the total size of growing segments of each shard
|
||||
# exceeds this threshold, the largest growing segment will be sealed.
|
||||
growingSegmentsMemSize: 4096
|
||||
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
|
||||
enableCompaction: true # Enable data segment compaction
|
||||
compaction:
|
||||
|
|
|
@ -205,7 +205,7 @@ func sealByTotalGrowingSegmentsSize() channelSealPolicy {
|
|||
return segment.GetID(), size
|
||||
})
|
||||
|
||||
threshold := paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.GetAsInt64() * 1024 * 1024
|
||||
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsInt64() * 1024 * 1024
|
||||
if totalSize >= threshold {
|
||||
target := lo.MaxBy(growingSegments, func(s1, s2 *SegmentInfo) bool {
|
||||
return sizeMap[s1.GetID()] > sizeMap[s2.GetID()]
|
||||
|
|
|
@ -198,8 +198,8 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_sealByTotalGrowingSegmentsSize(t *testing.T) {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key, "100")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key)
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "100")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key)
|
||||
|
||||
seg0 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 0,
|
||||
|
|
|
@ -2885,7 +2885,7 @@ type dataCoordConfig struct {
|
|||
SegmentMaxIdleTime ParamItem `refreshable:"false"`
|
||||
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
|
||||
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
|
||||
TotalGrowingSizeThresholdInMB ParamItem `refreshable:"true"`
|
||||
GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"`
|
||||
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
|
||||
SegmentFlushInterval ParamItem `refreshable:"true"`
|
||||
|
||||
|
@ -3126,15 +3126,15 @@ the number of binlog file reaches to max value.`,
|
|||
}
|
||||
p.SegmentMaxBinlogFileNumber.Init(base.mgr)
|
||||
|
||||
p.TotalGrowingSizeThresholdInMB = ParamItem{
|
||||
Key: "dataCoord.segment.totalGrowingSizeThresholdInMB",
|
||||
p.GrowingSegmentsMemSizeInMB = ParamItem{
|
||||
Key: "dataCoord.sealPolicy.channel.growingSegmentsMemSize",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "4096",
|
||||
Doc: `The size threshold in MB, if the total size of growing segments
|
||||
Doc: `The size threshold in MB, if the total size of growing segments of each shard
|
||||
exceeds this threshold, the largest growing segment will be sealed.`,
|
||||
Export: true,
|
||||
}
|
||||
p.TotalGrowingSizeThresholdInMB.Init(base.mgr)
|
||||
p.GrowingSegmentsMemSizeInMB.Init(base.mgr)
|
||||
|
||||
p.EnableCompaction = ParamItem{
|
||||
Key: "dataCoord.enableCompaction",
|
||||
|
|
|
@ -439,7 +439,7 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.True(t, Params.EnableGarbageCollection.GetAsBool())
|
||||
assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false)
|
||||
t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool())
|
||||
assert.Equal(t, int64(4096), Params.TotalGrowingSizeThresholdInMB.GetAsInt64())
|
||||
assert.Equal(t, int64(4096), Params.GrowingSegmentsMemSizeInMB.GetAsInt64())
|
||||
|
||||
assert.Equal(t, true, Params.AutoBalance.GetAsBool())
|
||||
assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt())
|
||||
|
|
|
@ -37,8 +37,8 @@ import (
|
|||
)
|
||||
|
||||
func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key, "10")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key)
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "10")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key)
|
||||
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.SyncPeriod.Key, "5")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.SyncPeriod.Key)
|
||||
|
|
Loading…
Reference in New Issue