diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 9126a30f47..10bea36a75 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -137,10 +137,21 @@ func (suite *CheckerControllerSuite) TestBasic() { suite.controller.Start() defer suite.controller.Stop() + // expect assign channel first suite.Eventually(func() bool { suite.controller.Check() - return counter.Load() > 0 && assignSegCounter.Load() > 0 && assingChanCounter.Load() > 0 - }, 5*time.Second, 1*time.Millisecond) + return counter.Load() > 0 && assingChanCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) + + // until new channel has been subscribed + suite.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2")) + suite.dist.LeaderViewManager.Update(1, utils.CreateTestLeaderView(1, 1, "test-insert-channel2", map[int64]int64{}, map[int64]*meta.Segment{})) + + // expect assign segment after channel has been subscribed + suite.Eventually(func() bool { + suite.controller.Check() + return counter.Load() > 0 && assignSegCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) } func TestCheckControllerSuite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index f3c98a95f2..d5f4e6f9e6 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -339,39 +339,44 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return nil } - isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 - - shardSegments := make(map[string][]*meta.Segment) - for _, s := range segments { - if isLevel0 && - len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 { - continue + // filter out stopping nodes and outbound nodes + outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) + availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { + stop, err := c.nodeMgr.IsStoppingNode(node) + if err != nil { + return false } - channel := s.GetInsertChannel() - packedSegments := shardSegments[channel] - packedSegments = append(packedSegments, &meta.Segment{ - SegmentInfo: s, - }) - shardSegments[channel] = packedSegments + return !outboundNodes.Contain(node) && !stop + }) + + if len(availableNodes) == 0 { + return nil } + isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 + shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string { + return s.GetInsertChannel() + }) + plans := make([]balance.SegmentAssignPlan, 0) for shard, segments := range shardSegments { - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { - stop, err := c.nodeMgr.IsStoppingNode(node) - if err != nil { - return false - } + // if channel is not subscribed yet, skip load segments + if len(c.dist.LeaderViewManager.GetLeadersByShard(shard)) == 0 { + continue + } - if isLevel0 { - leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) - return !outboundNodes.Contain(node) && !stop && node == leader.ID + // L0 segment can only be assign to shard leader's node + if isLevel0 { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) + availableNodes = []int64{leader.ID} + } + + segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment { + return &meta.Segment{ + SegmentInfo: s, } - return !outboundNodes.Contain(node) && !stop }) - - shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes) + shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes) for i := range shardPlans { shardPlans[i].ReplicaID = replica.GetID() } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index e88aacc6e3..af6495256e 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -149,6 +149,42 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } +func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + // when channel not subscribed, segment_checker won't generate load segment task + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 0) +} + func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { checker := suite.checker // set meta