Fix small segment compaction (#21327)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/25937/head
xige-16 2023-07-26 14:49:01 +08:00 committed by GitHub
parent b5e79e7f34
commit 6f18587f35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 6 deletions

View File

@ -338,7 +338,7 @@ dataCoord:
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows). # (smallProportion * segment max # of rows).
# A compaction will happen on small segments if the segment after compaction will have # A compaction will happen on small segments if the segment after compaction will have
compactableProportion: 0.5 compactableProportion: 0.85
# over (compactableProportion * segment max # of rows) rows. # over (compactableProportion * segment max # of rows) rows.
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!! # MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.

View File

@ -654,8 +654,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
} }
// only merge if candidate number is large than MinSegmentToMerge or if target row is large enough // only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() || if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
len(bucket) > 1 && len(bucket) > 1 && t.isCompactableSegment(targetRow, segment) {
targetRow > int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()) {
plan := segmentsToPlan(bucket, compactTime) plan := segmentsToPlan(bucket, compactTime)
log.Info("generate a plan for small candidates", log.Info("generate a plan for small candidates",
zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)), zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
@ -798,6 +797,18 @@ func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool {
return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat()) return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat())
} }
func (t *compactionTrigger) isCompactableSegment(targetRow int64, segment *SegmentInfo) bool {
smallProportion := Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
// avoid invalid single segment compaction
if compactableProportion < smallProportion {
compactableProportion = smallProportion
}
return targetRow > int64(float64(segment.GetMaxRowNum())*compactableProportion)
}
func isExpandableSmallSegment(segment *SegmentInfo) bool { func isExpandableSmallSegment(segment *SegmentInfo) bool {
return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()-1)) return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()-1))
} }

View File

@ -1156,8 +1156,8 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
spy := (tt.fields.compactionHandler).(*spyCompactionHandler) spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select { select {
case val := <-spy.spyChan: case val := <-spy.spyChan:
// 5 segments in the final pick list // 6 segments in the final pick list
assert.Equal(t, len(val.SegmentBinlogs), 5) assert.Equal(t, len(val.SegmentBinlogs), 6)
return return
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
assert.Fail(t, "failed to get plan") assert.Fail(t, "failed to get plan")

View File

@ -2177,7 +2177,7 @@ the number of binlog file reaches to max value.`,
p.SegmentCompactableProportion = ParamItem{ p.SegmentCompactableProportion = ParamItem{
Key: "dataCoord.segment.compactableProportion", Key: "dataCoord.segment.compactableProportion",
Version: "2.2.1", Version: "2.2.1",
DefaultValue: "0.5", DefaultValue: "0.85",
Doc: `(smallProportion * segment max # of rows). Doc: `(smallProportion * segment max # of rows).
A compaction will happen on small segments if the segment after compaction will have`, A compaction will happen on small segments if the segment after compaction will have`,
Export: true, Export: true,