From 196f0c1e1dc2aba003b4cb0173d5d245c970acfe Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 6 Mar 2024 21:36:59 +0800 Subject: [PATCH] fix: Skip invalid compaction plan (#31045) See also #31044 --------- Signed-off-by: Congqi Xia --- internal/datacoord/compaction.go | 32 +++-- internal/datacoord/compaction_test.go | 190 +++++++++++++++++++++----- 2 files changed, 178 insertions(+), 44 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 69c1384e3e..c5f4ffb40e 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -299,17 +300,20 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data return nil } -func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { +func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error { plan := task.plan log := log.With(zap.Int64("taskID", task.triggerInfo.id), zap.Int64("planID", plan.GetPlanID())) if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { // Fill in deltalogs for L0 segments - lo.ForEach(plan.SegmentBinlogs, func(seg *datapb.CompactionSegmentBinlogs, _ int) { + for _, seg := range plan.GetSegmentBinlogs() { if seg.GetLevel() == datapb.SegmentLevel_L0 { segInfo := c.meta.GetHealthySegment(seg.GetSegmentID()) + if segInfo == nil { + return merr.WrapErrSegmentNotFound(seg.GetSegmentID()) + } seg.Deltalogs = segInfo.GetDeltalogs() } - }) + } // Select sealed L1 segments for LevelZero compaction that meets the condition: // dmlPos < triggerInfo.pos @@ -338,27 +342,37 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target position", task.triggerInfo.pos), zap.Any("target segments count", len(sealedSegBinlogs))) - return + return nil } if plan.GetType() == datapb.CompactionType_MixCompaction { segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) for _, seg := range plan.GetSegmentBinlogs() { - if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil { - seg.Deltalogs = info.GetDeltalogs() - segIDMap[seg.SegmentID] = info.GetDeltalogs() + info := c.meta.GetHealthySegment(seg.GetSegmentID()) + if info == nil { + return merr.WrapErrSegmentNotFound(seg.GetSegmentID()) } + seg.Deltalogs = info.GetDeltalogs() + segIDMap[seg.SegmentID] = info.GetDeltalogs() } log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap)) - return } + return nil } func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { for _, task := range tasks { // avoid closure capture iteration variable innerTask := task - c.RefreshPlan(innerTask) + err := c.RefreshPlan(innerTask) + if err != nil { + c.updateTask(innerTask.plan.GetPlanID(), setState(failed), endSpan()) + c.scheduler.Finish(innerTask.dataNodeID, innerTask.plan) + log.Warn("failed to refresh task", + zap.Int64("plan", task.plan.PlanID), + zap.Error(err)) + continue + } getOrCreateIOPool().Submit(func() (any, error) { ctx := tracer.SetupSpan(context.Background(), innerTask.span) plan := innerTask.plan diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index cbb3dd6dc8..a0d711cb49 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -221,50 +222,169 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { }}, }, ) - s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { - return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - Level: datapb.SegmentLevel_L0, - InsertChannel: channel, - State: commonpb.SegmentState_Flushed, - Deltalogs: deltalogs, - }} - }) - // 2 l0 segments - plan := &datapb.CompactionPlan{ - PlanID: 1, - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 100, + s.Run("normal_refresh", func() { + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, Level: datapb.SegmentLevel_L0, InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Deltalogs: deltalogs, + }} + }).Times(2) + // 2 l0 segments + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + }, + { + SegmentID: 101, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + }, }, - { - SegmentID: 101, - Level: datapb.SegmentLevel_L0, - InsertChannel: channel, - }, - }, - Type: datapb.CompactionType_Level0DeleteCompaction, - } + Type: datapb.CompactionType_Level0DeleteCompaction, + } - task := &compactionTask{ - triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10}, - state: executing, - plan: plan, - dataNodeID: 1, - } + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10}, + state: executing, + plan: plan, + dataNodeID: 1, + } - handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) - handler.RefreshPlan(task) + handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) + err := handler.RefreshPlan(task) + s.Require().NoError(err) - s.Equal(5, len(task.plan.GetSegmentBinlogs())) - segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 { - return b.GetSegmentID() + s.Equal(5, len(task.plan.GetSegmentBinlogs())) + segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 { + return b.GetSegmentID() + }) + + s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs) }) - s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs) + s.Run("segment_not_found", func() { + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return nil + }).Once() + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 102, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + }, + }, + Type: datapb.CompactionType_Level0DeleteCompaction, + } + + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 19531, collectionID: 1, partitionID: 10}, + state: executing, + plan: plan, + dataNodeID: 1, + } + + handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) + err := handler.RefreshPlan(task) + s.Error(err) + s.ErrorIs(err, merr.ErrSegmentNotFound) + }) +} + +func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() { + channel := "Ch-1" + binlogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} + + s.Run("normal_refresh", func() { + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + }} + }).Times(2) + // 2 l0 segments + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 200, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, + { + SegmentID: 201, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, + }, + Type: datapb.CompactionType_MixCompaction, + } + + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10}, + state: executing, + plan: plan, + dataNodeID: 1, + } + + handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) + err := handler.RefreshPlan(task) + s.Require().NoError(err) + + s.Equal(2, len(task.plan.GetSegmentBinlogs())) + segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 { + return b.GetSegmentID() + }) + + s.ElementsMatch([]int64{200, 201}, segIDs) + }) + + s.Run("segment_not_found", func() { + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return nil + }).Once() + // 2 l0 segments + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 200, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, + { + SegmentID: 201, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, + }, + Type: datapb.CompactionType_MixCompaction, + } + + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10}, + state: executing, + plan: plan, + dataNodeID: 1, + } + + handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) + err := handler.RefreshPlan(task) + s.Error(err) + s.ErrorIs(err, merr.ErrSegmentNotFound) + }) } func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {