mirror of https://github.com/milvus-io/milvus.git
fix: Skip to submit l0 tasks when scheduler full (#31270)
See also: #31242 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/31240/head
parent
147a3b8bdc
commit
a1386bae7f
|
@ -51,6 +51,11 @@ func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext)
|
|||
func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) {
|
||||
log := log.With(zap.Int64("taskID", taskID))
|
||||
for _, view := range views {
|
||||
if m.handler.isFull() {
|
||||
log.RatedInfo(1.0, "Skip trigger compaction for scheduler is full")
|
||||
return
|
||||
}
|
||||
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroViewChange:
|
||||
log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange")
|
||||
|
|
|
@ -43,6 +43,33 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
|
|||
s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() {
|
||||
s.mockPlanContext.EXPECT().isFull().Return(true)
|
||||
viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator)
|
||||
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
|
||||
|
||||
segments, found := collSegs[1]
|
||||
s.Require().True(found)
|
||||
|
||||
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
|
||||
return info.GetLevel() == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
s.Require().NotEmpty(latestL0Segments)
|
||||
needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.Require().True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
s.NotNil(cView)
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.m.Notify(19530, TriggerTypeLevelZeroViewChange, levelZeroView)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
||||
viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator)
|
||||
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
|
||||
|
@ -70,6 +97,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
|||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).
|
||||
Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) {
|
||||
s.EqualValues(19530, signal.id)
|
||||
|
@ -117,6 +145,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).
|
||||
Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) {
|
||||
s.EqualValues(19530, signal.id)
|
||||
|
|
Loading…
Reference in New Issue