From ae0f467c0246abea9a8e120bf6ca13d044949bfa Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 27 Mar 2023 18:28:00 +0800 Subject: [PATCH] Fix segment/channel may be re-loaded/subscribed (#22969) Signed-off-by: yah01 --- internal/querycoordv2/task/scheduler.go | 18 ++++++++++ internal/querycoordv2/task/task_test.go | 46 +++++++++++++++++++------ 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 64ba330755..8a0e62a4bf 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/merr" . "github.com/milvus-io/milvus/internal/util/typeutil" @@ -222,6 +223,7 @@ func (scheduler *taskScheduler) Add(task Task) error { err := scheduler.preAdd(task) if err != nil { + task.Cancel(err) return err } @@ -265,6 +267,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error { return merr.WrapErrServiceInternal("task with the same segment exists") } + if GetTaskType(task) == TaskTypeGrow { + nodesWithSegment := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID()) + replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithSegment) + if _, ok := replicaNodeMap[task.ReplicaID()]; ok { + return merr.WrapErrServiceInternal("segment loaded, it can be only balanced") + } + } + case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} if old, ok := scheduler.channelTasks[index]; ok { @@ -283,6 +293,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error { return merr.WrapErrServiceInternal("task with the same channel exists") } + if GetTaskType(task) == TaskTypeGrow { + nodesWithChannel := scheduler.distMgr.LeaderViewManager.GetChannelDist(task.Channel()) + replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithChannel) + if _, ok := replicaNodeMap[task.ReplicaID()]; ok { + return merr.WrapErrServiceInternal("channel subscribed, it can be only balanced") + } + } + default: panic(fmt.Sprintf("preAdd: forget to process task type: %+v", task)) } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index d752ad55ae..649da0b0cb 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -185,14 +185,6 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { Return(&schemapb.CollectionSchema{ Name: "TestSubscribeChannelTask", }, nil) - channels := make([]*datapb.VchannelInfo, 0, len(suite.subChannels)) - for _, channel := range suite.subChannels { - channels = append(channels, &datapb.VchannelInfo{ - CollectionID: suite.collection, - ChannelName: channel, - UnflushedSegmentIds: []int64{suite.growingSegments[channel]}, - }) - } for channel, segment := range suite.growingSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment). Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -262,6 +254,42 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { } } +func (suite *TaskSuite) TestSubmitDuplicateSubscribeChannelTask() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + + tasks := []Task{} + for _, channel := range suite.subChannels { + task, err := NewChannelTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewChannelAction(targetNode, ActionTypeGrow, channel), + ) + suite.NoError(err) + tasks = append(tasks, task) + } + + views := make([]*meta.LeaderView, 0) + for _, channel := range suite.subChannels { + views = append(views, &meta.LeaderView{ + ID: targetNode, + CollectionID: suite.collection, + Channel: channel, + }) + } + suite.dist.LeaderViewManager.Update(targetNode, views...) + + for _, task := range tasks { + err := suite.scheduler.Add(task) + suite.Equal(TaskStatusCanceled, task.Status()) + suite.Error(err) + } +} + func (suite *TaskSuite) TestUnsubscribeChannelTask() { ctx := context.Background() timeout := 10 * time.Second @@ -1045,7 +1073,6 @@ func (suite *TaskSuite) TestNoExecutor() { CollectionID: suite.collection, ChannelName: channel.ChannelName, })) - tasks := []Task{} segments := make([]*datapb.SegmentBinlogs, 0) for _, segment := range suite.loadSegments { segments = append(segments, &datapb.SegmentBinlogs{ @@ -1061,7 +1088,6 @@ func (suite *TaskSuite) TestNoExecutor() { NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) - tasks = append(tasks, task) err = suite.scheduler.Add(task) suite.NoError(err) }