enhance: Avoid balance stuck after segment list become stable (#34728)

issue: #34715
if collection's segment list doesn't changes anymore, then the next
target will be empty at most time, and balance segment will check
whether segment exist in both current and next target, so the balance
cloud be blocked due to next target is empty.

This PR permit segment to be moved if next target is empty, to avoid
balance stuck.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/34771/head
wei liu 2024-07-31 18:09:48 +08:00 committed by GitHub
parent a9744d62b1
commit 03912a8788
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 33 additions and 53 deletions

View File

@ -23,7 +23,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
@ -148,9 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica
for _, nodeID := range offlineNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID), meta.WithChannel(channelName))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
for i := range plans {
@ -171,9 +168,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
for _, node := range onlineNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node), meta.WithChannel(channelName))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()

View File

@ -431,7 +431,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestBalanceOneRound() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -552,7 +551,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestBalanceMultiRound() {
append(balanceCase.nodes, balanceCase.notExistedNodes...)))
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionCurrentTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
}
// 2. set up target for distribution for multi collections
@ -711,7 +709,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestStoppedBalance() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -831,7 +828,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestMultiReplicaBalance() {
}
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.segmentDist {
@ -924,7 +920,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestExclusiveChannelBalance_Cha
balancer.meta.ReplicaManager.Spawn(1, map[string]int{meta.DefaultResourceGroupName: 1}, []string{"channel1", "channel2"})
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
// 3. set up nodes info and resourceManager for balancer
nodeCount := 4
@ -999,7 +994,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestExclusiveChannelBalance_Seg
balancer.meta.ReplicaManager.Spawn(1, map[string]int{meta.DefaultResourceGroupName: 1}, []string{"channel1", "channel2"})
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
// 3. set up nodes info and resourceManager for balancer
nodeCount := 4
@ -1097,7 +1091,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestExclusiveChannelBalance_Nod
balancer.meta.ReplicaManager.Spawn(1, map[string]int{meta.DefaultResourceGroupName: 1}, []string{"channel1", "channel2"})
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
// 3. set up nodes info and resourceManager for balancer
nodeCount := 4
@ -1222,7 +1215,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestExclusiveChannelBalance_Seg
balancer.meta.ReplicaManager.Spawn(1, map[string]int{meta.DefaultResourceGroupName: 1}, []string{"channel1", "channel2"})
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
// 3. set up nodes info and resourceManager for balancer
nodeCount := 4

View File

@ -9,7 +9,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -511,9 +510,7 @@ func (b *MultiTargetBalancer) genSegmentPlan(replica *meta.Replica, rwNodes []in
for _, node := range rwNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
nodeSegments[node] = segments
globalNodeSegments[node] = b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(node))

View File

@ -25,7 +25,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
@ -220,9 +219,7 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, rw
for _, nodeID := range roNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
plans := b.AssignSegment(replica.GetCollectionID(), segments, rwNodes, false)
for i := range plans {
@ -243,9 +240,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, rwNodes []
for _, node := range rwNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
rowCount := 0
for _, s := range segments {

View File

@ -463,7 +463,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
@ -819,7 +818,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
@ -1051,7 +1049,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
@ -1178,7 +1175,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() {
}
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.segmentDist {

View File

@ -23,7 +23,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -265,9 +264,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin
for _, nodeID := range offlineNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
for i := range plans {
@ -288,9 +285,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
for _, node := range onlineNodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
return b.targetMgr.CanSegmentBeMoved(segment)
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()

View File

@ -435,7 +435,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -631,7 +630,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -754,7 +752,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
append(balanceCase.nodes, balanceCase.notExistedNodes...)))
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionCurrentTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
}
// 2. set up target for distribution for multi collections
@ -897,7 +894,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -1017,7 +1013,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() {
}
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.segmentDist {

View File

@ -690,3 +690,22 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
return nil
}
// if segment isn't l0 segment, and exist in current/next target, then it can be moved
func (mgr *TargetManager) CanSegmentBeMoved(segment *Segment) bool {
if segment.GetLevel() == datapb.SegmentLevel_L0 {
return false
}
current := mgr.current.getCollectionTarget(segment.CollectionID)
if current != nil && current.segments[segment.GetID()] != nil {
return true
}
next := mgr.next.getCollectionTarget(segment.CollectionID)
if next != nil && next.segments[segment.GetID()] != nil {
return true
}
return false
}

View File

@ -436,23 +436,19 @@ func (s *Server) CheckQueryNodeDistribution(ctx context.Context, req *querypb.Ch
}
}
// check segment list
segmentOnSrc := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetSourceNodeID()))
segmentOnDst := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetTargetNodeID()))
segmentDstMap := lo.SliceToMap(segmentOnDst, func(s *meta.Segment) (int64, *meta.Segment) {
return s.GetID(), s
})
for _, s := range segmentOnSrc {
if _, ok := segmentDstMap[s.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil
// check whether all segment which both exist in target and source node has been loaded on target node
for _, segment := range segmentOnSrc {
if s.targetMgr.GetSealedSegment(segment.CollectionID, segment.ID, meta.CurrentTargetFirst) == nil {
continue
}
}
segmentSrcMap := lo.SliceToMap(segmentOnSrc, func(s *meta.Segment) (int64, *meta.Segment) {
return s.GetID(), s
})
for _, s := range segmentOnDst {
if _, ok := segmentSrcMap[s.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil
if _, ok := segmentDstMap[segment.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(segment.GetID())), nil
}
}