mirror of https://github.com/milvus-io/milvus.git
enhance: Add force trigger (#30641)
1. Increase maxCount of L0 compaction tasks to 30 This could reduce the l0 compaction task number by 30% for high-frequently-generated-small l0 segments, with the maximum size 64MB stay not changed. So that l0 segments would accumulate slower and decrease the mem presure caused by L0 segment for QueryNode 2. Add force Trigger for later manual timely l0 compaction triggers. See also: #30191, #30556 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/30675/head
parent
564b12c661
commit
44d436d0b6
|
@ -74,28 +74,8 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) {
|
|||
return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp()
|
||||
})
|
||||
|
||||
var (
|
||||
minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat()
|
||||
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
|
||||
minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt()
|
||||
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
|
||||
)
|
||||
|
||||
targetViews, targetSize := v.filterViewsBySizeRange(validSegments, minDeltaSize, maxDeltaSize)
|
||||
if targetViews != nil {
|
||||
reason := fmt.Sprintf("level zero segments size reaches compaction limit, curDeltaSize=%.2f, limitSizeRange=[%.2f, %.2f]",
|
||||
targetSize, minDeltaSize, maxDeltaSize)
|
||||
return &LevelZeroSegmentsView{
|
||||
label: v.label,
|
||||
segments: targetViews,
|
||||
earliestGrowingSegmentPos: v.earliestGrowingSegmentPos,
|
||||
}, reason
|
||||
}
|
||||
|
||||
targetViews, targetCount := v.filterViewsByCountRange(validSegments, minDeltaCount, maxDeltaCount)
|
||||
if targetViews != nil {
|
||||
reason := fmt.Sprintf("level zero segments count reaches compaction limit, curDeltaCount=%d, limitCountRange=[%d, %d]",
|
||||
targetCount, minDeltaCount, maxDeltaCount)
|
||||
targetViews, reason := v.minCountSizeTrigger(validSegments)
|
||||
if len(targetViews) > 0 {
|
||||
return &LevelZeroSegmentsView{
|
||||
label: v.label,
|
||||
segments: targetViews,
|
||||
|
@ -106,44 +86,68 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) {
|
|||
return nil, ""
|
||||
}
|
||||
|
||||
// filterViewByCountRange picks segment views that total sizes in range [minCount, maxCount]
|
||||
func (v *LevelZeroSegmentsView) filterViewsByCountRange(segments []*SegmentView, minCount, maxCount int) ([]*SegmentView, int) {
|
||||
curDeltaCount := 0
|
||||
// minCountSizeTrigger tries to trigger LevelZeroCompaction when segmentViews reaches minimum trigger conditions:
|
||||
// 1. count >= minDeltaCount, OR
|
||||
// 2. size >= minDeltaSize
|
||||
func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) {
|
||||
var (
|
||||
minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat()
|
||||
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
|
||||
minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt()
|
||||
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
|
||||
)
|
||||
|
||||
curSize := float64(0)
|
||||
|
||||
// count >= minDeltaCount
|
||||
if lo.SumBy(segments, func(view *SegmentView) int { return view.DeltalogCount }) >= minDeltaCount {
|
||||
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
|
||||
reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaCount, curSize, len(segments))
|
||||
return
|
||||
}
|
||||
|
||||
// size >= minDeltaSize
|
||||
if lo.SumBy(segments, func(view *SegmentView) float64 { return view.DeltaSize }) >= minDeltaSize {
|
||||
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
|
||||
reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2f, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaSize, curSize, len(segments))
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// forceTrigger tries to trigger LevelZeroCompaction even when segmentsViews don't meet the minimum condition,
|
||||
// the picked plan is still satisfied with the maximum condition
|
||||
func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) {
|
||||
var (
|
||||
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
|
||||
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
|
||||
)
|
||||
|
||||
curSize := float64(0)
|
||||
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
|
||||
reason = fmt.Sprintf("level zero views force to trigger, curDeltaSize=%.2f, curDeltaCount=%d", curSize, len(segments))
|
||||
return
|
||||
}
|
||||
|
||||
// pickByMaxCountSize picks segments that count <= maxCount or size <= maxSize
|
||||
func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) ([]*SegmentView, float64) {
|
||||
var (
|
||||
curDeltaCount = 0
|
||||
curDeltaSize = float64(0)
|
||||
)
|
||||
idx := 0
|
||||
for _, view := range segments {
|
||||
targetCount := view.DeltalogCount + curDeltaCount
|
||||
if idx != 0 && targetCount > maxCount {
|
||||
break
|
||||
}
|
||||
|
||||
idx += 1
|
||||
curDeltaCount = targetCount
|
||||
}
|
||||
|
||||
if curDeltaCount < minCount {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
return segments[:idx], curDeltaCount
|
||||
}
|
||||
|
||||
// filterViewBySizeRange picks segment views that total count in range [minSize, maxSize]
|
||||
func (v *LevelZeroSegmentsView) filterViewsBySizeRange(segments []*SegmentView, minSize, maxSize float64) ([]*SegmentView, float64) {
|
||||
var curDeltaSize float64
|
||||
idx := 0
|
||||
for _, view := range segments {
|
||||
targetSize := view.DeltaSize + curDeltaSize
|
||||
if idx != 0 && targetSize > maxSize {
|
||||
|
||||
if (curDeltaCount != 0 && curDeltaSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) {
|
||||
break
|
||||
}
|
||||
|
||||
idx += 1
|
||||
curDeltaCount = targetCount
|
||||
curDeltaSize = targetSize
|
||||
idx += 1
|
||||
}
|
||||
|
||||
if curDeltaSize < minSize {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
return segments[:idx], curDeltaSize
|
||||
}
|
||||
|
|
|
@ -171,3 +171,86 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LevelZeroSegmentsViewSuite) TestMinCountSizeTrigger() {
|
||||
label := s.v.GetGroupLabel()
|
||||
tests := []struct {
|
||||
description string
|
||||
segIDs []int64
|
||||
segCounts []int
|
||||
segSize []float64
|
||||
|
||||
expectedIDs []int64
|
||||
}{
|
||||
{"donot trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, nil},
|
||||
{"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}},
|
||||
{"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}},
|
||||
{"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}},
|
||||
{"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}},
|
||||
{"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}},
|
||||
{"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
s.Run(test.description, func() {
|
||||
views := []*SegmentView{}
|
||||
for idx, ID := range test.segIDs {
|
||||
seg := genTestL0SegmentView(ID, label, 10000)
|
||||
seg.DeltaSize = test.segSize[idx]
|
||||
seg.DeltalogCount = test.segCounts[idx]
|
||||
|
||||
views = append(views, seg)
|
||||
}
|
||||
|
||||
picked, reason := s.v.minCountSizeTrigger(views)
|
||||
s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 {
|
||||
return view.ID
|
||||
}), test.expectedIDs)
|
||||
|
||||
if len(picked) > 0 {
|
||||
s.NotEmpty(reason)
|
||||
}
|
||||
|
||||
log.Info("test minCountSizeTrigger", zap.Any("trigger reason", reason))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LevelZeroSegmentsViewSuite) TestForceTrigger() {
|
||||
label := s.v.GetGroupLabel()
|
||||
tests := []struct {
|
||||
description string
|
||||
segIDs []int64
|
||||
segCounts []int
|
||||
segSize []float64
|
||||
|
||||
expectedIDs []int64
|
||||
}{
|
||||
{"force trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, []int64{100, 101, 102}},
|
||||
{"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}},
|
||||
{"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}},
|
||||
{"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}},
|
||||
{"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}},
|
||||
{"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}},
|
||||
{"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
s.Run(test.description, func() {
|
||||
views := []*SegmentView{}
|
||||
for idx, ID := range test.segIDs {
|
||||
seg := genTestL0SegmentView(ID, label, 10000)
|
||||
seg.DeltaSize = test.segSize[idx]
|
||||
seg.DeltalogCount = test.segCounts[idx]
|
||||
|
||||
views = append(views, seg)
|
||||
}
|
||||
|
||||
picked, reason := s.v.forceTrigger(views)
|
||||
s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 {
|
||||
return view.ID
|
||||
}), test.expectedIDs)
|
||||
log.Info("test forceTrigger", zap.Any("trigger reason", reason))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2742,8 +2742,8 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
p.LevelZeroCompactionTriggerDeltalogMaxNum = ParamItem{
|
||||
Key: "dataCoord.compaction.levelzero.forceTrigger.deltalogMaxNum",
|
||||
Version: "2.4.0",
|
||||
Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 20",
|
||||
DefaultValue: "20",
|
||||
Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30",
|
||||
DefaultValue: "30",
|
||||
}
|
||||
p.LevelZeroCompactionTriggerDeltalogMaxNum.Init(base.mgr)
|
||||
|
||||
|
|
Loading…
Reference in New Issue