mirror of https://github.com/milvus-io/milvus.git
parent
f0ababb420
commit
5244020336
|
@ -245,65 +245,66 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode
|
|||
channelPlans = append(channelPlans, plans...)
|
||||
}
|
||||
|
||||
if len(channelPlans) == 0 && len(onlineNodes) > 1 {
|
||||
// start to balance channels on all available nodes
|
||||
channels := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID)
|
||||
channelsOnNode := lo.GroupBy(channels, func(channel *meta.DmChannel) int64 { return channel.Node })
|
||||
// if len(channelPlans) == 0 && len(onlineNodes) > 1 {
|
||||
// // start to balance channels on all available nodes
|
||||
// channels := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID)
|
||||
// channelsOnNode := lo.GroupBy(channels, func(channel *meta.DmChannel) int64 { return channel.Node })
|
||||
|
||||
nodes := replica.GetNodes()
|
||||
getChannelNum := func(node int64) int {
|
||||
if channelsOnNode[node] == nil {
|
||||
return 0
|
||||
}
|
||||
return len(channelsOnNode[node])
|
||||
}
|
||||
sort.Slice(nodes, func(i, j int) bool { return getChannelNum(nodes[i]) < getChannelNum(nodes[j]) })
|
||||
// nodes := replica.GetNodes()
|
||||
// getChannelNum := func(node int64) int {
|
||||
// if channelsOnNode[node] == nil {
|
||||
// return 0
|
||||
// }
|
||||
// return len(channelsOnNode[node])
|
||||
// }
|
||||
// sort.Slice(nodes, func(i, j int) bool { return getChannelNum(nodes[i]) < getChannelNum(nodes[j]) })
|
||||
|
||||
start := int64(0)
|
||||
end := int64(len(nodes) - 1)
|
||||
averageChannel := len(channels) / len(onlineNodes)
|
||||
if averageChannel == 0 || getChannelNum(nodes[start]) >= getChannelNum(nodes[end]) {
|
||||
return channelPlans
|
||||
}
|
||||
// start := int64(0)
|
||||
// end := int64(len(nodes) - 1)
|
||||
|
||||
for start < end {
|
||||
// segment to move in
|
||||
targetNode := nodes[start]
|
||||
// segment to move out
|
||||
sourceNode := nodes[end]
|
||||
// averageChannel := int(math.Ceil(float64(len(channels)) / float64(len(onlineNodes))))
|
||||
// if averageChannel == 0 || getChannelNum(nodes[start]) >= getChannelNum(nodes[end]) {
|
||||
// return channelPlans
|
||||
// }
|
||||
|
||||
if len(channelsOnNode[targetNode]) >= averageChannel {
|
||||
break
|
||||
}
|
||||
// for start < end {
|
||||
// // segment to move in
|
||||
// targetNode := nodes[start]
|
||||
// // segment to move out
|
||||
// sourceNode := nodes[end]
|
||||
|
||||
// remove channel from end node
|
||||
selectChannel := channelsOnNode[sourceNode][0]
|
||||
channelsOnNode[sourceNode] = channelsOnNode[sourceNode][1:]
|
||||
// if len(channelsOnNode[sourceNode])-1 < averageChannel {
|
||||
// break
|
||||
// }
|
||||
|
||||
// add channel to start node
|
||||
if channelsOnNode[targetNode] == nil {
|
||||
channelsOnNode[targetNode] = make([]*meta.DmChannel, 0)
|
||||
}
|
||||
channelsOnNode[targetNode] = append(channelsOnNode[targetNode], selectChannel)
|
||||
// // remove channel from end node
|
||||
// selectChannel := channelsOnNode[sourceNode][0]
|
||||
// channelsOnNode[sourceNode] = channelsOnNode[sourceNode][1:]
|
||||
|
||||
// generate channel plan
|
||||
plan := ChannelAssignPlan{
|
||||
Channel: selectChannel,
|
||||
From: sourceNode,
|
||||
To: targetNode,
|
||||
ReplicaID: replica.ID,
|
||||
}
|
||||
channelPlans = append(channelPlans, plan)
|
||||
for end > 0 && getChannelNum(nodes[end]) <= averageChannel {
|
||||
end--
|
||||
}
|
||||
// // add channel to start node
|
||||
// if channelsOnNode[targetNode] == nil {
|
||||
// channelsOnNode[targetNode] = make([]*meta.DmChannel, 0)
|
||||
// }
|
||||
// channelsOnNode[targetNode] = append(channelsOnNode[targetNode], selectChannel)
|
||||
|
||||
for start < end && getChannelNum(nodes[start]) >= averageChannel {
|
||||
start++
|
||||
}
|
||||
}
|
||||
// // generate channel plan
|
||||
// plan := ChannelAssignPlan{
|
||||
// Channel: selectChannel,
|
||||
// From: sourceNode,
|
||||
// To: targetNode,
|
||||
// ReplicaID: replica.ID,
|
||||
// }
|
||||
// channelPlans = append(channelPlans, plan)
|
||||
// for end > 0 && getChannelNum(nodes[end]) <= averageChannel {
|
||||
// end--
|
||||
// }
|
||||
|
||||
}
|
||||
// for start < end && getChannelNum(nodes[start]) >= averageChannel {
|
||||
// start++
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
return channelPlans
|
||||
}
|
||||
|
||||
|
|
|
@ -244,34 +244,34 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "balance channel",
|
||||
nodes: []int64{2, 3},
|
||||
segmentCnts: []int{2, 2},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
shouldMock: true,
|
||||
distributions: map[int64][]*meta.Segment{
|
||||
2: {
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||
},
|
||||
3: {
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
},
|
||||
},
|
||||
distributionChannels: map[int64][]*meta.DmChannel{
|
||||
2: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
|
||||
},
|
||||
3: {},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{
|
||||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1},
|
||||
},
|
||||
},
|
||||
// {
|
||||
// name: "balance channel",
|
||||
// nodes: []int64{2, 3},
|
||||
// segmentCnts: []int{2, 2},
|
||||
// states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
// shouldMock: true,
|
||||
// distributions: map[int64][]*meta.Segment{
|
||||
// 2: {
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||
// },
|
||||
// 3: {
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
// {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
|
||||
// },
|
||||
// },
|
||||
// distributionChannels: map[int64][]*meta.DmChannel{
|
||||
// 2: {
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
|
||||
// },
|
||||
// 3: {},
|
||||
// },
|
||||
// expectPlans: []SegmentAssignPlan{},
|
||||
// expectChannelPlans: []ChannelAssignPlan{
|
||||
// {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1},
|
||||
// },
|
||||
// },
|
||||
{
|
||||
name: "unbalance stable view",
|
||||
nodes: []int64{1, 2, 3},
|
||||
|
@ -281,19 +281,39 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
distributions: map[int64][]*meta.Segment{},
|
||||
distributionChannels: map[int64][]*meta.DmChannel{
|
||||
1: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 2},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1},
|
||||
},
|
||||
2: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
|
||||
},
|
||||
3: {
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v4"}, Node: 2},
|
||||
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v4"}, Node: 3},
|
||||
},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{},
|
||||
},
|
||||
// {
|
||||
// name: "balance unstable view",
|
||||
// nodes: []int64{1, 2, 3},
|
||||
// segmentCnts: []int{0, 0, 0},
|
||||
// states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
|
||||
// shouldMock: true,
|
||||
// distributions: map[int64][]*meta.Segment{},
|
||||
// distributionChannels: map[int64][]*meta.DmChannel{
|
||||
// 1: {
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1},
|
||||
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1},
|
||||
// },
|
||||
// 2: {},
|
||||
// 3: {},
|
||||
// },
|
||||
// expectPlans: []SegmentAssignPlan{},
|
||||
// expectChannelPlans: []ChannelAssignPlan{
|
||||
// {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1}, From: 1, To: 2, ReplicaID: 1},
|
||||
// },
|
||||
// },
|
||||
{
|
||||
name: "already balanced",
|
||||
nodes: []int64{11, 22},
|
||||
|
|
Loading…
Reference in New Issue