mirror of https://github.com/milvus-io/milvus.git
enhance: enable balance channel in querycoord (#28469)
issue: #23726 /kind improvement 1. enable auto balance channel between nodes in querycoord 2. make `genSegmentPlan` reuse the `AssignSegment` logic 3. make `genChannelPlan` reuse the `AssignChannel` logic --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/28981/head^2
parent
d2207cac27
commit
42e538b683
|
@ -17,6 +17,8 @@
|
|||
package balance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
@ -35,8 +37,10 @@ type RowCountBasedBalancer struct {
|
|||
targetMgr *meta.TargetManager
|
||||
}
|
||||
|
||||
// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
|
||||
// try to make every query node has same row count.
|
||||
func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64) []SegmentAssignPlan {
|
||||
nodeItems := b.convertToNodeItems(nodes)
|
||||
nodeItems := b.convertToNodeItemsBySegment(nodes)
|
||||
if len(nodeItems) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -67,7 +71,37 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
|
|||
return plans
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem {
|
||||
// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
|
||||
// try to make every query node has channel count
|
||||
func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64) []ChannelAssignPlan {
|
||||
nodeItems := b.convertToNodeItemsByChannel(nodes)
|
||||
if len(nodeItems) == 0 {
|
||||
return nil
|
||||
}
|
||||
queue := newPriorityQueue()
|
||||
for _, item := range nodeItems {
|
||||
queue.push(item)
|
||||
}
|
||||
|
||||
plans := make([]ChannelAssignPlan, 0, len(channels))
|
||||
for _, c := range channels {
|
||||
// pick the node with the least channel num and allocate to it.
|
||||
ni := queue.pop().(*nodeItem)
|
||||
plan := ChannelAssignPlan{
|
||||
From: -1,
|
||||
To: ni.nodeID,
|
||||
Channel: c,
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
// change node's priority and push back
|
||||
p := ni.getPriority()
|
||||
ni.setPriority(p + 1)
|
||||
queue.push(ni)
|
||||
}
|
||||
return plans
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*nodeItem {
|
||||
ret := make([]*nodeItem, 0, len(nodeIDs))
|
||||
for _, nodeInfo := range b.getNodes(nodeIDs) {
|
||||
node := nodeInfo.ID()
|
||||
|
@ -92,76 +126,128 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem
|
|||
return ret
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*nodeItem {
|
||||
ret := make([]*nodeItem, 0, len(nodeIDs))
|
||||
for _, nodeInfo := range b.getNodes(nodeIDs) {
|
||||
node := nodeInfo.ID()
|
||||
channels := b.dist.ChannelDistManager.GetByNode(node)
|
||||
|
||||
// more channel num, less priority
|
||||
nodeItem := newNodeItem(len(channels), node)
|
||||
ret = append(ret, &nodeItem)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.RowCountBasedBalancer", 1, 60).With(
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replica.GetCollectionID()),
|
||||
zap.String("resourceGroup", replica.Replica.GetResourceGroup()),
|
||||
)
|
||||
nodes := replica.GetNodes()
|
||||
if len(nodes) < 2 {
|
||||
return nil, nil
|
||||
}
|
||||
onlineNodesSegments := make(map[int64][]*meta.Segment)
|
||||
stoppingNodesSegments := make(map[int64][]*meta.Segment)
|
||||
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
|
||||
|
||||
totalCnt := 0
|
||||
for _, nid := range nodes {
|
||||
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
|
||||
// Only balance segments in targets
|
||||
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
||||
})
|
||||
onlineNodes := make([]int64, 0)
|
||||
offlineNodes := make([]int64, 0)
|
||||
|
||||
for _, nid := range nodes {
|
||||
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
|
||||
log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err))
|
||||
log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err))
|
||||
continue
|
||||
} else if isStopping {
|
||||
stoppingNodesSegments[nid] = segments
|
||||
offlineNodes = append(offlineNodes, nid)
|
||||
} else if outboundNodes.Contain(nid) {
|
||||
// if node is stop or transfer to other rg
|
||||
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replica.GetCollectionID()),
|
||||
zap.Int64("node", nid),
|
||||
)
|
||||
stoppingNodesSegments[nid] = segments
|
||||
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid))
|
||||
offlineNodes = append(offlineNodes, nid)
|
||||
} else {
|
||||
onlineNodesSegments[nid] = segments
|
||||
}
|
||||
|
||||
for _, s := range segments {
|
||||
totalCnt += int(s.GetNumOfRows())
|
||||
onlineNodes = append(onlineNodes, nid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == len(stoppingNodesSegments) || len(onlineNodesSegments) == 0 {
|
||||
if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 {
|
||||
// no available nodes to balance
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
segmentsToMove := make([]*meta.Segment, 0)
|
||||
for _, stopSegments := range stoppingNodesSegments {
|
||||
segmentsToMove = append(segmentsToMove, stopSegments...)
|
||||
if len(offlineNodes) > 0 {
|
||||
log.Info("Balance for stopping nodes",
|
||||
zap.Any("stoppingNodes", offlineNodes),
|
||||
zap.Any("onlineNodes", onlineNodes),
|
||||
)
|
||||
return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes)
|
||||
}
|
||||
|
||||
// find nodes with less row count than average
|
||||
nodesWithLessRow := newPriorityQueue()
|
||||
average := totalCnt / len(onlineNodesSegments)
|
||||
for node, segments := range onlineNodesSegments {
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
|
||||
})
|
||||
return b.genSegmentPlan(replica, onlineNodes), b.genChannelPlan(replica, onlineNodes)
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
|
||||
segmentPlans := make([]SegmentAssignPlan, 0)
|
||||
for _, nodeID := range offlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
||||
})
|
||||
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes)
|
||||
for i := range plans {
|
||||
plans[i].From = nodeID
|
||||
plans[i].ReplicaID = replica.ID
|
||||
}
|
||||
segmentPlans = append(segmentPlans, plans...)
|
||||
}
|
||||
return segmentPlans
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
|
||||
segmentsToMove := make([]*meta.Segment, 0)
|
||||
|
||||
nodeRowCount := make(map[int64]int, 0)
|
||||
segmentDist := make(map[int64][]*meta.Segment)
|
||||
totalRowCount := 0
|
||||
for _, node := range onlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
||||
})
|
||||
rowCount := 0
|
||||
for _, s := range segments {
|
||||
rowCount += int(s.GetNumOfRows())
|
||||
if rowCount <= average {
|
||||
continue
|
||||
}
|
||||
|
||||
segmentsToMove = append(segmentsToMove, s)
|
||||
}
|
||||
if rowCount < average {
|
||||
item := newNodeItem(rowCount, node)
|
||||
nodesWithLessRow.push(&item)
|
||||
totalRowCount += rowCount
|
||||
segmentDist[node] = segments
|
||||
nodeRowCount[node] = rowCount
|
||||
}
|
||||
|
||||
if totalRowCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// find nodes with less row count than average
|
||||
average := totalRowCount / len(onlineNodes)
|
||||
nodesWithLessRow := make([]int64, 0)
|
||||
for node, segments := range segmentDist {
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].GetNumOfRows() < segments[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
leftRowCount := nodeRowCount[node]
|
||||
if leftRowCount < average {
|
||||
nodesWithLessRow = append(nodesWithLessRow, node)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, s := range segments {
|
||||
leftRowCount -= int(s.GetNumOfRows())
|
||||
if leftRowCount < average {
|
||||
break
|
||||
}
|
||||
segmentsToMove = append(segmentsToMove, s)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,49 +256,20 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
|
|||
return len(b.dist.SegmentDistManager.Get(s.GetID())) == 1
|
||||
})
|
||||
|
||||
return b.genSegmentPlan(replica, nodesWithLessRow, segmentsToMove, average), b.genChannelPlan(replica, lo.Keys(onlineNodesSegments), lo.Keys(stoppingNodesSegments))
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, nodesWithLessRowCount priorityQueue, segmentsToMove []*meta.Segment, average int) []SegmentAssignPlan {
|
||||
if nodesWithLessRowCount.Len() == 0 || len(segmentsToMove) == 0 {
|
||||
if len(nodesWithLessRow) == 0 || len(segmentsToMove) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(segmentsToMove, func(i, j int) bool {
|
||||
return segmentsToMove[i].GetNumOfRows() < segmentsToMove[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
// allocate segments to those nodes with row cnt less than average
|
||||
plans := make([]SegmentAssignPlan, 0)
|
||||
for _, s := range segmentsToMove {
|
||||
if nodesWithLessRowCount.Len() <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
node := nodesWithLessRowCount.pop().(*nodeItem)
|
||||
newPriority := node.getPriority() + int(s.GetNumOfRows())
|
||||
if newPriority > average {
|
||||
nodesWithLessRowCount.push(node)
|
||||
continue
|
||||
}
|
||||
|
||||
plan := SegmentAssignPlan{
|
||||
ReplicaID: replica.GetID(),
|
||||
From: s.Node,
|
||||
To: node.nodeID,
|
||||
Segment: s,
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
node.setPriority(newPriority)
|
||||
nodesWithLessRowCount.push(node)
|
||||
segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, nodesWithLessRow)
|
||||
for i := range segmentPlans {
|
||||
segmentPlans[i].From = segmentPlans[i].Segment.Node
|
||||
segmentPlans[i].ReplicaID = replica.ID
|
||||
}
|
||||
return plans
|
||||
|
||||
return segmentPlans
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
|
||||
log.Info("balance channel",
|
||||
zap.Int64s("online nodes", onlineNodes),
|
||||
zap.Int64s("offline nodes", offlineNodes))
|
||||
func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
|
||||
channelPlans := make([]ChannelAssignPlan, 0)
|
||||
for _, nodeID := range offlineNodes {
|
||||
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
||||
|
@ -223,67 +280,45 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode
|
|||
}
|
||||
channelPlans = append(channelPlans, plans...)
|
||||
}
|
||||
return channelPlans
|
||||
}
|
||||
|
||||
// if len(channelPlans) == 0 && len(onlineNodes) > 1 {
|
||||
// // start to balance channels on all available nodes
|
||||
// channels := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID)
|
||||
// channelsOnNode := lo.GroupBy(channels, func(channel *meta.DmChannel) int64 { return channel.Node })
|
||||
func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNodes []int64) []ChannelAssignPlan {
|
||||
channelPlans := make([]ChannelAssignPlan, 0)
|
||||
if len(onlineNodes) > 1 {
|
||||
// start to balance channels on all available nodes
|
||||
channelDist := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID)
|
||||
if len(channelDist) == 0 {
|
||||
return nil
|
||||
}
|
||||
average := int(math.Ceil(float64(len(channelDist)) / float64(len(onlineNodes))))
|
||||
|
||||
// nodes := replica.GetNodes()
|
||||
// getChannelNum := func(node int64) int {
|
||||
// if channelsOnNode[node] == nil {
|
||||
// return 0
|
||||
// }
|
||||
// return len(channelsOnNode[node])
|
||||
// }
|
||||
// sort.Slice(nodes, func(i, j int) bool { return getChannelNum(nodes[i]) < getChannelNum(nodes[j]) })
|
||||
// find nodes with less channel count than average
|
||||
nodeWithLessChannel := make([]int64, 0)
|
||||
channelsToMove := make([]*meta.DmChannel, 0)
|
||||
for _, node := range onlineNodes {
|
||||
channels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
|
||||
|
||||
// start := int64(0)
|
||||
// end := int64(len(nodes) - 1)
|
||||
if len(channels) <= average {
|
||||
nodeWithLessChannel = append(nodeWithLessChannel, node)
|
||||
continue
|
||||
}
|
||||
|
||||
// averageChannel := int(math.Ceil(float64(len(channels)) / float64(len(onlineNodes))))
|
||||
// if averageChannel == 0 || getChannelNum(nodes[start]) >= getChannelNum(nodes[end]) {
|
||||
// return channelPlans
|
||||
// }
|
||||
channelsToMove = append(channelsToMove, channels[average:]...)
|
||||
}
|
||||
|
||||
// for start < end {
|
||||
// // segment to move in
|
||||
// targetNode := nodes[start]
|
||||
// // segment to move out
|
||||
// sourceNode := nodes[end]
|
||||
if len(nodeWithLessChannel) == 0 || len(channelsToMove) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// if len(channelsOnNode[sourceNode])-1 < averageChannel {
|
||||
// break
|
||||
// }
|
||||
channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel)
|
||||
for i := range channelPlans {
|
||||
channelPlans[i].From = channelPlans[i].Channel.Node
|
||||
channelPlans[i].ReplicaID = replica.ID
|
||||
}
|
||||
|
||||
// // remove channel from end node
|
||||
// selectChannel := channelsOnNode[sourceNode][0]
|
||||
// channelsOnNode[sourceNode] = channelsOnNode[sourceNode][1:]
|
||||
|
||||
// // add channel to start node
|
||||
// if channelsOnNode[targetNode] == nil {
|
||||
// channelsOnNode[targetNode] = make([]*meta.DmChannel, 0)
|
||||
// }
|
||||
// channelsOnNode[targetNode] = append(channelsOnNode[targetNode], selectChannel)
|
||||
|
||||
// // generate channel plan
|
||||
// plan := ChannelAssignPlan{
|
||||
// Channel: selectChannel,
|
||||
// From: sourceNode,
|
||||
// To: targetNode,
|
||||
// ReplicaID: replica.ID,
|
||||
// }
|
||||
// channelPlans = append(channelPlans, plan)
|
||||
// for end > 0 && getChannelNum(nodes[end]) <= averageChannel {
|
||||
// end--
|
||||
// }
|
||||
|
||||
// for start < end && getChannelNum(nodes[start]) >= averageChannel {
|
||||
// start++
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
return channelPlans
|
||||
}
|
||||
return channelPlans
|
||||
}
|
||||
|
||||
|
|
|
@ -248,34 +248,25 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
|
||||
},
|
||||
},
|
||||
// {
|
||||
// name: "balance channel",
|
||||
// nodes: []int64{2, 3},
|
||||
// segmentCnts: []int{2, 2},
|
||||
// states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
// shouldMock: true,
|
||||
// distributions: map[int64][]*meta.Segment{
|
||||
// 2: {
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||
// },
|
||||
// 3: {
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
// },
|
||||
// },
|
||||
// distributionChannels: map[int64][]*meta.DmChannel{
|
||||
// 2: {
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
|
||||
// },
|
||||
// 3: {},
|
||||
// },
|
||||
// expectPlans: []SegmentAssignPlan{},
|
||||
// expectChannelPlans: []ChannelAssignPlan{
|
||||
// {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1},
|
||||
// },
|
||||
// },
|
||||
{
|
||||
name: "balance channel",
|
||||
nodes: []int64{2, 3},
|
||||
segmentCnts: []int{2, 2},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
shouldMock: true,
|
||||
distributions: map[int64][]*meta.Segment{},
|
||||
distributionChannels: map[int64][]*meta.DmChannel{
|
||||
2: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
|
||||
},
|
||||
3: {},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{
|
||||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, ReplicaID: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unbalance stable view",
|
||||
nodes: []int64{1, 2, 3},
|
||||
|
@ -298,26 +289,26 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{},
|
||||
},
|
||||
// {
|
||||
// name: "balance unstable view",
|
||||
// nodes: []int64{1, 2, 3},
|
||||
// segmentCnts: []int{0, 0, 0},
|
||||
// states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
|
||||
// shouldMock: true,
|
||||
// distributions: map[int64][]*meta.Segment{},
|
||||
// distributionChannels: map[int64][]*meta.DmChannel{
|
||||
// 1: {
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1},
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1},
|
||||
// },
|
||||
// 2: {},
|
||||
// 3: {},
|
||||
// },
|
||||
// expectPlans: []SegmentAssignPlan{},
|
||||
// expectChannelPlans: []ChannelAssignPlan{
|
||||
// {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1}, From: 1, To: 2, ReplicaID: 1},
|
||||
// },
|
||||
// },
|
||||
{
|
||||
name: "balance unstable view",
|
||||
nodes: []int64{1, 2, 3},
|
||||
segmentCnts: []int{0, 0, 0},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
|
||||
shouldMock: true,
|
||||
distributions: map[int64][]*meta.Segment{},
|
||||
distributionChannels: map[int64][]*meta.DmChannel{
|
||||
1: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1},
|
||||
},
|
||||
2: {},
|
||||
3: {},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{
|
||||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 2, ReplicaID: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "already balanced",
|
||||
nodes: []int64{11, 22},
|
||||
|
@ -377,7 +368,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
|
||||
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
|
||||
for node, s := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node, s...)
|
||||
}
|
||||
|
@ -396,6 +387,14 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
|
||||
suite.ElementsMatch(c.expectChannelPlans, channelPlans)
|
||||
suite.ElementsMatch(c.expectPlans, segmentPlans)
|
||||
|
||||
// clear distribution
|
||||
for node := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node)
|
||||
}
|
||||
for node := range c.distributionChannels {
|
||||
balancer.dist.ChannelDistManager.Update(node)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -576,7 +575,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
|
|||
suite.broker.ExpectedCalls = nil
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil)
|
||||
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
|
||||
for node, s := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node, s...)
|
||||
}
|
||||
|
@ -646,7 +645,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
|
|||
},
|
||||
}
|
||||
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
|
||||
for _, c := range cases {
|
||||
suite.Run(c.name, func() {
|
||||
suite.SetupSuite()
|
||||
|
|
|
@ -189,11 +189,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||
)
|
||||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...)
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
} else {
|
||||
// normal balance, find segments from largest score nodes and transfer to smallest score nodes.
|
||||
segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...)
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), nil)...)
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...)
|
||||
}
|
||||
if len(segmentPlans) != 0 || len(channelPlans) != 0 {
|
||||
PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager)
|
||||
|
|
|
@ -586,7 +586,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
|
|||
for i, c := range cases {
|
||||
suite.Run(c.name, func() {
|
||||
if i == 0 {
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
|
||||
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
|
||||
}
|
||||
suite.SetupSuite()
|
||||
defer suite.TearDownTest()
|
||||
|
|
Loading…
Reference in New Issue