mirror of https://github.com/milvus-io/milvus.git
fix: Exlude L0 compaction when clustering is executing (#37141)
Also remove conflit check when executing L0. The exclusive is already guarenteed in scheduler See also: #37140 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/37206/head
parent
1e75a42053
commit
26028f4137
|
@ -248,7 +248,8 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
|
|||
|
||||
switch t.GetTaskProto().GetType() {
|
||||
case datapb.CompactionType_Level0DeleteCompaction:
|
||||
if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
|
||||
if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) ||
|
||||
clusterChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
|
||||
excluded = append(excluded, t)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -296,19 +296,11 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
|
|||
}))
|
||||
|
||||
if len(sealedSegments) == 0 {
|
||||
// TO-DO fast finish l0 segment, just drop l0 segment
|
||||
// TODO fast finish l0 segment, just drop l0 segment
|
||||
log.Info("l0Compaction available non-L0 Segments is empty ")
|
||||
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
|
||||
}
|
||||
|
||||
for _, segInfo := range sealedSegments {
|
||||
// TODO should allow parallel executing of l0 compaction
|
||||
if segInfo.isCompacting {
|
||||
log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID()))
|
||||
return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID()))
|
||||
}
|
||||
}
|
||||
|
||||
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
||||
return &datapb.CompactionSegmentBinlogs{
|
||||
SegmentID: info.GetID(),
|
||||
|
|
|
@ -221,74 +221,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
|
|||
s.EqualValues(NullNodeID, t.GetTaskProto().NodeID)
|
||||
})
|
||||
|
||||
s.Run("test pipelining BuildCompactionRequest failed", func() {
|
||||
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
|
||||
t.updateAndSaveTaskMeta(setNodeID(100))
|
||||
channel := "ch-1"
|
||||
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
|
||||
|
||||
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
|
||||
[]*SegmentInfo{
|
||||
{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 200,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
InsertChannel: channel,
|
||||
}, isCompacting: true},
|
||||
},
|
||||
)
|
||||
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: segID,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
InsertChannel: channel,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Deltalogs: deltaLogs,
|
||||
}}
|
||||
}).Twice()
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()
|
||||
|
||||
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
got := t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().State)
|
||||
})
|
||||
s.Run("test pipelining saveTaskMeta failed", func() {
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
|
||||
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
|
||||
t.updateAndSaveTaskMeta(setNodeID(100))
|
||||
channel := "ch-1"
|
||||
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
|
||||
|
||||
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
|
||||
[]*SegmentInfo{
|
||||
{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 200,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
InsertChannel: channel,
|
||||
}, isCompacting: true},
|
||||
},
|
||||
)
|
||||
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: segID,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
InsertChannel: channel,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Deltalogs: deltaLogs,
|
||||
}}
|
||||
}).Twice()
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
|
||||
got := t.Process()
|
||||
s.False(got)
|
||||
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State)
|
||||
})
|
||||
|
||||
s.Run("test pipelining Compaction failed", func() {
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
|
||||
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
|
||||
|
|
Loading…
Reference in New Issue