fix: [cp24]Exlude L0 compaction when clustering is executing (#37142)

Also remove conflit check when executing L0. The exclusive is already
guarenteed in scheduler

See also: #37140
pr: #37141

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/37198/head
XuanYang-cn 2024-10-28 15:01:30 +08:00 committed by GitHub
parent 223badc482
commit 4cb5b2c3b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 3 additions and 75 deletions

View File

@ -239,7 +239,8 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
switch t.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if mixChannelExcludes.Contain(t.GetChannel()) {
if mixChannelExcludes.Contain(t.GetChannel()) ||
clusterChannelExcludes.Contain(t.GetChannel()) {
excluded = append(excluded, t)
continue
}

View File

@ -328,19 +328,11 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
}))
if len(sealedSegments) == 0 {
// TO-DO fast finish l0 segment, just drop l0 segment
// TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", t.GetPos())
}
for _, segInfo := range sealedSegments {
// TODO should allow parallel executing of l0 compaction
if segInfo.isCompacting {
log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID()))
return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID()))
}
}
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),

View File

@ -177,71 +177,6 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
s.EqualValues(NullNodeID, t.NodeID)
})
s.Run("test pipelining BuildCompactionRequest failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}, isCompacting: true},
},
)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.State)
})
s.Run("test pipelining saveTaskMeta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}, isCompacting: true},
},
)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.State)
})
s.Run("test pipelining Compaction failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100