From 81fdb6f472086ae578dcba278a74e6145eadf101 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 16 Jan 2024 10:12:52 +0800 Subject: [PATCH] enhance: Skip generate load segment task (#29724) (#29982) issue: #29814 pr: #29724 if channel is not subscribed yet, the generated load segment task will be remove from task scheduler due to the load segment task need to be transfer to worker node by shard leader. This PR skip generate load segment task when channel is not subscribed yet. Signed-off-by: Wei Liu --- .../querycoordv2/checkers/controller_test.go | 15 ++++- .../querycoordv2/checkers/segment_checker.go | 55 ++++++++++--------- .../checkers/segment_checker_test.go | 36 ++++++++++++ 3 files changed, 79 insertions(+), 27 deletions(-) diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 9126a30f47..10bea36a75 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -137,10 +137,21 @@ func (suite *CheckerControllerSuite) TestBasic() { suite.controller.Start() defer suite.controller.Stop() + // expect assign channel first suite.Eventually(func() bool { suite.controller.Check() - return counter.Load() > 0 && assignSegCounter.Load() > 0 && assingChanCounter.Load() > 0 - }, 5*time.Second, 1*time.Millisecond) + return counter.Load() > 0 && assingChanCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) + + // until new channel has been subscribed + suite.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2")) + suite.dist.LeaderViewManager.Update(1, utils.CreateTestLeaderView(1, 1, "test-insert-channel2", map[int64]int64{}, map[int64]*meta.Segment{})) + + // expect assign segment after channel has been subscribed + suite.Eventually(func() bool { + suite.controller.Check() + return counter.Load() > 0 && assignSegCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) } func TestCheckControllerSuite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index f3c98a95f2..d5f4e6f9e6 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -339,39 +339,44 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return nil } - isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 - - shardSegments := make(map[string][]*meta.Segment) - for _, s := range segments { - if isLevel0 && - len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 { - continue + // filter out stopping nodes and outbound nodes + outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) + availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { + stop, err := c.nodeMgr.IsStoppingNode(node) + if err != nil { + return false } - channel := s.GetInsertChannel() - packedSegments := shardSegments[channel] - packedSegments = append(packedSegments, &meta.Segment{ - SegmentInfo: s, - }) - shardSegments[channel] = packedSegments + return !outboundNodes.Contain(node) && !stop + }) + + if len(availableNodes) == 0 { + return nil } + isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 + shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string { + return s.GetInsertChannel() + }) + plans := make([]balance.SegmentAssignPlan, 0) for shard, segments := range shardSegments { - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { - stop, err := c.nodeMgr.IsStoppingNode(node) - if err != nil { - return false - } + // if channel is not subscribed yet, skip load segments + if len(c.dist.LeaderViewManager.GetLeadersByShard(shard)) == 0 { + continue + } - if isLevel0 { - leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) - return !outboundNodes.Contain(node) && !stop && node == leader.ID + // L0 segment can only be assign to shard leader's node + if isLevel0 { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) + availableNodes = []int64{leader.ID} + } + + segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment { + return &meta.Segment{ + SegmentInfo: s, } - return !outboundNodes.Contain(node) && !stop }) - - shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes) + shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes) for i := range shardPlans { shardPlans[i].ReplicaID = replica.GetID() } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index e88aacc6e3..af6495256e 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -149,6 +149,42 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } +func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + // when channel not subscribed, segment_checker won't generate load segment task + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 0) +} + func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { checker := suite.checker // set meta