mirror of https://github.com/milvus-io/milvus.git
enhance: [cherry-pick]Skip submmit empty l0 tasks in DC (#31338)
pr: #31280 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/31385/head
parent
0dbc0f2c11
commit
8946aa10d4
|
@ -332,6 +332,9 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
|
|||
info.GetLevel() != datapb.SegmentLevel_L0 &&
|
||||
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
|
||||
})
|
||||
if len(sealedSegments) == 0 {
|
||||
return errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.triggerInfo.pos)
|
||||
}
|
||||
|
||||
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
||||
return &datapb.CompactionSegmentBinlogs{
|
||||
|
|
|
@ -298,6 +298,49 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
|
|||
s.Error(err)
|
||||
s.ErrorIs(err, merr.ErrSegmentNotFound)
|
||||
})
|
||||
|
||||
s.Run("select zero segments", func() {
|
||||
s.SetupTest()
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: segID,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
InsertChannel: channel,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Deltalogs: deltalogs,
|
||||
}}
|
||||
}).Times(2)
|
||||
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(nil).Once()
|
||||
|
||||
// 2 l0 segments
|
||||
plan := &datapb.CompactionPlan{
|
||||
PlanID: 1,
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: 100,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
InsertChannel: channel,
|
||||
},
|
||||
{
|
||||
SegmentID: 101,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
InsertChannel: channel,
|
||||
},
|
||||
},
|
||||
Type: datapb.CompactionType_Level0DeleteCompaction,
|
||||
}
|
||||
|
||||
task := &compactionTask{
|
||||
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
|
||||
state: executing,
|
||||
plan: plan,
|
||||
dataNodeID: 1,
|
||||
}
|
||||
|
||||
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
|
||||
err := handler.RefreshPlan(task)
|
||||
s.Error(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {
|
||||
|
|
Loading…
Reference in New Issue