enhance: Remove load task limit in one round (#38436)

the task limit in assignSegment/assignChannel will works for both load
task and balance task.

this PR remove the load task limit, only limit balance task num in one
round.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/38485/head^2
wei liu 2024-12-16 19:30:43 +08:00 committed by GitHub
parent 9c8c1b3bb7
commit 659847c11f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 28 additions and 36 deletions

View File

@ -58,8 +58,8 @@ func (chanPlan *ChannelAssignPlan) String() string {
}
type Balance interface {
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan
BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}
@ -68,9 +68,9 @@ type RoundRobinBalancer struct {
nodeManager *session.NodeManager
}
func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
return info != nil && info.GetState() == session.NodeStateNormal
@ -104,9 +104,9 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
return ret
}
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
versionRangeFilter := semver.MustParseRange(">2.3.x")
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)

View File

@ -42,9 +42,8 @@ type RowCountBasedBalancer struct {
// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
// try to make every query node has same row count.
func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
return info != nil && info.GetState() == session.NodeStateNormal
@ -87,9 +86,9 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
versionRangeFilter := semver.MustParseRange(">2.3.x")
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)

View File

@ -51,14 +51,14 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
}
// AssignSegment got a segment list, and try to assign each segment to node's with lowest score
func (b *ScoreBasedBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
func (b *ScoreBasedBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
br := NewBalanceReport()
return b.assignSegment(br, collectionID, segments, nodes, manualBalance)
return b.assignSegment(br, collectionID, segments, nodes, forceAssign)
}
func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
balanceBatchSize := math.MaxInt64
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
normalNode := info != nil && info.GetState() == session.NodeStateNormal
@ -67,6 +67,7 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
}
return normalNode
})
balanceBatchSize = paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
}
// calculate each node's score
@ -92,10 +93,6 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
})
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]SegmentAssignPlan, 0, len(segments))
for _, s := range segments {
func(s *meta.Segment) {
@ -108,8 +105,8 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
sourceNode := nodeItemsMap[s.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
// notice: we should skip benefit check for forceAssign
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for segment %d since no enough benefit", s.ID))
return
}
@ -146,14 +143,14 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return plans
}
func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
br := NewBalanceReport()
return b.assignChannel(br, collectionID, channels, nodes, manualBalance)
return b.assignChannel(br, collectionID, channels, nodes, forceAssign)
}
func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
balanceBatchSize := math.MaxInt64
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
normalNode := info != nil && info.GetState() == session.NodeStateNormal
@ -162,6 +159,7 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
}
return normalNode
})
balanceBatchSize = paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt()
}
// calculate each node's score
@ -174,11 +172,6 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
for _, item := range nodeItemsMap {
queue.push(item)
}
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]ChannelAssignPlan, 0, len(channels))
for _, ch := range channels {
func(ch *meta.DmChannel) {
@ -191,8 +184,8 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
sourceNode := nodeItemsMap[ch.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
// notice: we should skip benefit check for forceAssign
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName()))
return
}

View File

@ -232,7 +232,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
if len(rwNodes) == 0 {
rwNodes = replica.GetRWNodes()
}
plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, false)
plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, true)
plans = append(plans, plan...)
}

View File

@ -436,7 +436,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
SegmentInfo: s,
}
})
shardPlans := c.getBalancerFunc().AssignSegment(ctx, replica.GetCollectionID(), segmentInfos, rwNodes, false)
shardPlans := c.getBalancerFunc().AssignSegment(ctx, replica.GetCollectionID(), segmentInfos, rwNodes, true)
for i := range shardPlans {
shardPlans[i].Replica = replica
}