fix: Skip invalid compaction plan (#31045)

See also #31044

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31089/head
congqixia 2024-03-06 21:36:59 +08:00 committed by GitHub
parent 007fab183c
commit 196f0c1e1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 178 additions and 44 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -299,17 +300,20 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
return nil
}
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
plan := task.plan
log := log.With(zap.Int64("taskID", task.triggerInfo.id), zap.Int64("planID", plan.GetPlanID()))
if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Fill in deltalogs for L0 segments
lo.ForEach(plan.SegmentBinlogs, func(seg *datapb.CompactionSegmentBinlogs, _ int) {
for _, seg := range plan.GetSegmentBinlogs() {
if seg.GetLevel() == datapb.SegmentLevel_L0 {
segInfo := c.meta.GetHealthySegment(seg.GetSegmentID())
if segInfo == nil {
return merr.WrapErrSegmentNotFound(seg.GetSegmentID())
}
seg.Deltalogs = segInfo.GetDeltalogs()
}
})
}
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
@ -338,27 +342,37 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
log.Info("Compaction handler refreshed level zero compaction plan",
zap.Any("target position", task.triggerInfo.pos),
zap.Any("target segments count", len(sealedSegBinlogs)))
return
return nil
}
if plan.GetType() == datapb.CompactionType_MixCompaction {
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
for _, seg := range plan.GetSegmentBinlogs() {
if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil {
seg.Deltalogs = info.GetDeltalogs()
segIDMap[seg.SegmentID] = info.GetDeltalogs()
info := c.meta.GetHealthySegment(seg.GetSegmentID())
if info == nil {
return merr.WrapErrSegmentNotFound(seg.GetSegmentID())
}
seg.Deltalogs = info.GetDeltalogs()
segIDMap[seg.SegmentID] = info.GetDeltalogs()
}
log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap))
return
}
return nil
}
func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
for _, task := range tasks {
// avoid closure capture iteration variable
innerTask := task
c.RefreshPlan(innerTask)
err := c.RefreshPlan(innerTask)
if err != nil {
c.updateTask(innerTask.plan.GetPlanID(), setState(failed), endSpan())
c.scheduler.Finish(innerTask.dataNodeID, innerTask.plan)
log.Warn("failed to refresh task",
zap.Int64("plan", task.plan.PlanID),
zap.Error(err))
continue
}
getOrCreateIOPool().Submit(func() (any, error) {
ctx := tracer.SetupSpan(context.Background(), innerTask.span)
plan := innerTask.plan

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -221,50 +222,169 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
}},
},
)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltalogs,
}}
})
// 2 l0 segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
s.Run("normal_refresh", func() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltalogs,
}}
}).Times(2)
// 2 l0 segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
{
SegmentID: 101,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
},
{
SegmentID: 101,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_Level0DeleteCompaction,
}
Type: datapb.CompactionType_Level0DeleteCompaction,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
handler.RefreshPlan(task)
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
err := handler.RefreshPlan(task)
s.Require().NoError(err)
s.Equal(5, len(task.plan.GetSegmentBinlogs()))
segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
return b.GetSegmentID()
s.Equal(5, len(task.plan.GetSegmentBinlogs()))
segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
return b.GetSegmentID()
})
s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs)
})
s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs)
s.Run("segment_not_found", func() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return nil
}).Once()
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 102,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_Level0DeleteCompaction,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19531, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
err := handler.RefreshPlan(task)
s.Error(err)
s.ErrorIs(err, merr.ErrSegmentNotFound)
})
}
func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {
channel := "Ch-1"
binlogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.Run("normal_refresh", func() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
}}
}).Times(2)
// 2 l0 segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
{
SegmentID: 201,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_MixCompaction,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
err := handler.RefreshPlan(task)
s.Require().NoError(err)
s.Equal(2, len(task.plan.GetSegmentBinlogs()))
segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
return b.GetSegmentID()
})
s.ElementsMatch([]int64{200, 201}, segIDs)
})
s.Run("segment_not_found", func() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return nil
}).Once()
// 2 l0 segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
{
SegmentID: 201,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_MixCompaction,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
err := handler.RefreshPlan(task)
s.Error(err)
s.ErrorIs(err, merr.ErrSegmentNotFound)
})
}
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {