enhance: Skip generate load segment task (#29724) (#29982)

issue: #29814
pr: #29724
if channel is not subscribed yet, the generated load segment task will
be remove from task scheduler due to the load segment task need to be
transfer to worker node by shard leader.

This PR skip generate load segment task when channel is not subscribed
yet.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/30018/head
wei liu 2024-01-16 10:12:52 +08:00 committed by GitHub
parent f2a0c3f02b
commit 81fdb6f472
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 27 deletions

View File

@ -137,10 +137,21 @@ func (suite *CheckerControllerSuite) TestBasic() {
suite.controller.Start() suite.controller.Start()
defer suite.controller.Stop() defer suite.controller.Stop()
// expect assign channel first
suite.Eventually(func() bool { suite.Eventually(func() bool {
suite.controller.Check() suite.controller.Check()
return counter.Load() > 0 && assignSegCounter.Load() > 0 && assingChanCounter.Load() > 0 return counter.Load() > 0 && assingChanCounter.Load() > 0
}, 5*time.Second, 1*time.Millisecond) }, 3*time.Second, 1*time.Millisecond)
// until new channel has been subscribed
suite.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2"))
suite.dist.LeaderViewManager.Update(1, utils.CreateTestLeaderView(1, 1, "test-insert-channel2", map[int64]int64{}, map[int64]*meta.Segment{}))
// expect assign segment after channel has been subscribed
suite.Eventually(func() bool {
suite.controller.Check()
return counter.Load() > 0 && assignSegCounter.Load() > 0
}, 3*time.Second, 1*time.Millisecond)
} }
func TestCheckControllerSuite(t *testing.T) { func TestCheckControllerSuite(t *testing.T) {

View File

@ -339,39 +339,44 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
return nil return nil
} }
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 // filter out stopping nodes and outbound nodes
shardSegments := make(map[string][]*meta.Segment)
for _, s := range segments {
if isLevel0 &&
len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 {
continue
}
channel := s.GetInsertChannel()
packedSegments := shardSegments[channel]
packedSegments = append(packedSegments, &meta.Segment{
SegmentInfo: s,
})
shardSegments[channel] = packedSegments
}
plans := make([]balance.SegmentAssignPlan, 0)
for shard, segments := range shardSegments {
outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool {
stop, err := c.nodeMgr.IsStoppingNode(node) stop, err := c.nodeMgr.IsStoppingNode(node)
if err != nil { if err != nil {
return false return false
} }
if isLevel0 {
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard)
return !outboundNodes.Contain(node) && !stop && node == leader.ID
}
return !outboundNodes.Contain(node) && !stop return !outboundNodes.Contain(node) && !stop
}) })
shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes) if len(availableNodes) == 0 {
return nil
}
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
return s.GetInsertChannel()
})
plans := make([]balance.SegmentAssignPlan, 0)
for shard, segments := range shardSegments {
// if channel is not subscribed yet, skip load segments
if len(c.dist.LeaderViewManager.GetLeadersByShard(shard)) == 0 {
continue
}
// L0 segment can only be assign to shard leader's node
if isLevel0 {
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard)
availableNodes = []int64{leader.ID}
}
segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment {
return &meta.Segment{
SegmentInfo: s,
}
})
shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes)
for i := range shardPlans { for i := range shardPlans {
shardPlans[i].ReplicaID = replica.GetID() shardPlans[i].ReplicaID = replica.GetID()
} }

View File

@ -149,6 +149,42 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
} }
func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
checker := suite.checker
// set meta
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
// set target
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
// when channel not subscribed, segment_checker won't generate load segment task
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {
checker := suite.checker checker := suite.checker
// set meta // set meta