mirror of https://github.com/milvus-io/milvus.git
enhance: Solve channel unbalance on datanode (#34984)
issue: #33583 the old policy permit datanode has at most 2 more channels than other datanode. so if milvus has 2 datanode and 2 channels, both 2 channels will be assign to 1 datanode, left another datanode empty. This PR refine the balance policy to solve channel unbalance on datanode --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/35031/head
parent
94d8f48b85
commit
6e9fbd1630
|
@ -89,17 +89,32 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet {
|
|||
totalChannelNum += len(nodeChs.Channels)
|
||||
}
|
||||
channelCountPerNode := totalChannelNum / avaNodeNum
|
||||
maxChannelCountPerNode := channelCountPerNode
|
||||
remainder := totalChannelNum % avaNodeNum
|
||||
if remainder > 0 {
|
||||
maxChannelCountPerNode += 1
|
||||
}
|
||||
for _, nChannels := range cluster {
|
||||
chCount := len(nChannels.Channels)
|
||||
if chCount <= channelCountPerNode+1 {
|
||||
if chCount == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
toReleaseCount := chCount - channelCountPerNode
|
||||
if remainder > 0 && chCount >= maxChannelCountPerNode {
|
||||
remainder -= 1
|
||||
toReleaseCount = chCount - maxChannelCountPerNode
|
||||
}
|
||||
|
||||
if toReleaseCount == 0 {
|
||||
log.Info("node channel count is not much larger than average, skip reallocate",
|
||||
zap.Int64("nodeID", nChannels.NodeID),
|
||||
zap.Int("channelCount", chCount),
|
||||
zap.Int("channelCountPerNode", channelCountPerNode))
|
||||
continue
|
||||
}
|
||||
|
||||
reallocate := NewNodeChannelInfo(nChannels.NodeID)
|
||||
toReleaseCount := chCount - channelCountPerNode - 1
|
||||
for _, ch := range nChannels.Channels {
|
||||
reallocate.AddChannel(ch)
|
||||
toReleaseCount--
|
||||
|
@ -144,6 +159,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf
|
|||
fromCluster = append(fromCluster, info)
|
||||
channelNum += len(info.Channels)
|
||||
nodeToAvg.Insert(info.NodeID)
|
||||
return
|
||||
}
|
||||
|
||||
// Get toCluster by filtering out execlusive nodes
|
||||
|
@ -152,6 +168,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf
|
|||
}
|
||||
|
||||
toCluster = append(toCluster, info)
|
||||
channelNum += len(info.Channels)
|
||||
nodeToAvg.Insert(info.NodeID)
|
||||
})
|
||||
|
||||
|
|
|
@ -71,16 +71,16 @@ func (s *PolicySuite) TestAvgBalanceChannelPolicy() {
|
|||
s.Nil(opSet)
|
||||
})
|
||||
s.Run("test uneven with conservative effect", func() {
|
||||
// as we deem that the node having only one channel more than average as even, so there's no reallocation
|
||||
// for this test case
|
||||
// even distribution should have not results
|
||||
uneven := []*NodeChannelInfo{
|
||||
{100, getChannels(map[string]int64{"ch1": 1, "ch2": 1})},
|
||||
{NodeID: 101},
|
||||
}
|
||||
|
||||
opSet := AvgBalanceChannelPolicy(uneven)
|
||||
s.Nil(opSet)
|
||||
s.Equal(opSet.Len(), 1)
|
||||
for _, op := range opSet.Collect() {
|
||||
s.True(lo.Contains([]string{"ch1", "ch2"}, op.GetChannelNames()[0]))
|
||||
}
|
||||
})
|
||||
s.Run("test uneven with zero", func() {
|
||||
uneven := []*NodeChannelInfo{
|
||||
|
@ -286,4 +286,31 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
|
|||
})
|
||||
s.ElementsMatch([]int64{3, 1}, nodeIDs)
|
||||
})
|
||||
|
||||
s.Run("assign to reach average", func() {
|
||||
curCluster := []*NodeChannelInfo{
|
||||
{1, getChannels(map[string]int64{"ch-1": 1, "ch-2": 1, "ch-3": 1})},
|
||||
{2, getChannels(map[string]int64{"ch-4": 1, "ch-5": 1, "ch-6": 4, "ch-7": 4, "ch-8": 4})},
|
||||
}
|
||||
unassigned := NewNodeChannelInfo(bufferID,
|
||||
getChannel("new-ch-1", 1),
|
||||
getChannel("new-ch-2", 1),
|
||||
getChannel("new-ch-3", 1),
|
||||
)
|
||||
|
||||
opSet := AvgAssignByCountPolicy(curCluster, unassigned, nil)
|
||||
s.NotNil(opSet)
|
||||
|
||||
s.Equal(3, opSet.GetChannelNumber())
|
||||
s.Equal(2, opSet.Len())
|
||||
for _, op := range opSet.Collect() {
|
||||
if op.Type == Delete {
|
||||
s.Equal(int64(bufferID), op.NodeID)
|
||||
}
|
||||
|
||||
if op.Type == Watch {
|
||||
s.Equal(int64(1), op.NodeID)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue