mirror of https://github.com/milvus-io/milvus.git
enhance: Skip generate load segment task (#29724)
issue: #29814 if channel is not subscribed yet, the generated load segment task will be remove from task scheduler due to the load segment task need to be transfer to worker node by shard leader. This PR skip generate load segment task when channel is not subscribed yet. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/29922/head
parent
e1258b8cad
commit
565fc3a019
|
@ -137,10 +137,21 @@ func (suite *CheckerControllerSuite) TestBasic() {
|
|||
suite.controller.Start()
|
||||
defer suite.controller.Stop()
|
||||
|
||||
// expect assign channel first
|
||||
suite.Eventually(func() bool {
|
||||
suite.controller.Check()
|
||||
return counter.Load() > 0 && assignSegCounter.Load() > 0 && assingChanCounter.Load() > 0
|
||||
}, 5*time.Second, 1*time.Millisecond)
|
||||
return counter.Load() > 0 && assingChanCounter.Load() > 0
|
||||
}, 3*time.Second, 1*time.Millisecond)
|
||||
|
||||
// until new channel has been subscribed
|
||||
suite.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2"))
|
||||
suite.dist.LeaderViewManager.Update(1, utils.CreateTestLeaderView(1, 1, "test-insert-channel2", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
|
||||
// expect assign segment after channel has been subscribed
|
||||
suite.Eventually(func() bool {
|
||||
suite.controller.Check()
|
||||
return counter.Load() > 0 && assignSegCounter.Load() > 0
|
||||
}, 3*time.Second, 1*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestCheckControllerSuite(t *testing.T) {
|
||||
|
|
|
@ -344,39 +344,44 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
|
|||
return nil
|
||||
}
|
||||
|
||||
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
|
||||
|
||||
shardSegments := make(map[string][]*meta.Segment)
|
||||
for _, s := range segments {
|
||||
if isLevel0 &&
|
||||
len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 {
|
||||
continue
|
||||
// filter out stopping nodes and outbound nodes
|
||||
outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica)
|
||||
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool {
|
||||
stop, err := c.nodeMgr.IsStoppingNode(node)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
channel := s.GetInsertChannel()
|
||||
packedSegments := shardSegments[channel]
|
||||
packedSegments = append(packedSegments, &meta.Segment{
|
||||
SegmentInfo: s,
|
||||
})
|
||||
shardSegments[channel] = packedSegments
|
||||
return !outboundNodes.Contain(node) && !stop
|
||||
})
|
||||
|
||||
if len(availableNodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
|
||||
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
|
||||
return s.GetInsertChannel()
|
||||
})
|
||||
|
||||
plans := make([]balance.SegmentAssignPlan, 0)
|
||||
for shard, segments := range shardSegments {
|
||||
outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica)
|
||||
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool {
|
||||
stop, err := c.nodeMgr.IsStoppingNode(node)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// if channel is not subscribed yet, skip load segments
|
||||
if len(c.dist.LeaderViewManager.GetLeadersByShard(shard)) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if isLevel0 {
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard)
|
||||
return !outboundNodes.Contain(node) && !stop && node == leader.ID
|
||||
// L0 segment can only be assign to shard leader's node
|
||||
if isLevel0 {
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard)
|
||||
availableNodes = []int64{leader.ID}
|
||||
}
|
||||
|
||||
segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment {
|
||||
return &meta.Segment{
|
||||
SegmentInfo: s,
|
||||
}
|
||||
return !outboundNodes.Contain(node) && !stop
|
||||
})
|
||||
|
||||
shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes)
|
||||
shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes)
|
||||
for i := range shardPlans {
|
||||
shardPlans[i].ReplicaID = replica.GetID()
|
||||
}
|
||||
|
|
|
@ -160,6 +160,42 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
|
|||
suite.Len(tasks, 1)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
|
||||
checker := suite.checker
|
||||
// set meta
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// set target
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// when channel not subscribed, segment_checker won't generate load segment task
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 0)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {
|
||||
checker := suite.checker
|
||||
// set meta
|
||||
|
|
Loading…
Reference in New Issue