fix: Enable to balance released standby channel (#32986)

See also: #32879

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/32881/head^2
XuanYang-cn 2024-05-14 11:51:32 +08:00 committed by GitHub
parent dc058eaf61
commit 40e5c0be70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 54 additions and 26 deletions

View File

@ -181,7 +181,7 @@ func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error {
log.Info("register node", zap.Int64("registered node", nodeID)) log.Info("register node", zap.Int64("registered node", nodeID))
m.store.AddNode(nodeID) m.store.AddNode(nodeID)
updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo().GetChannels(), m.legacyNodes.Collect()) updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
if updates == nil { if updates == nil {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
@ -234,7 +234,7 @@ func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error {
// channel already written into meta, try to assign it to the cluster // channel already written into meta, try to assign it to the cluster
// not error is returned if failed, the assignment will retry later // not error is returned if failed, the assignment will retry later
updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), []RWChannel{ch}, m.legacyNodes.Collect()) updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
if updates == nil { if updates == nil {
return nil return nil
} }
@ -284,7 +284,7 @@ func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original.GetChannels(), m.legacyNodes.Collect()) updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
if updates != nil { if updates != nil {
return m.execute(updates) return m.execute(updates)
} }

View File

@ -363,7 +363,7 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet {
return opSet return opSet
} }
func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWChannel, execlusiveNodes []int64) *ChannelOpSet { func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInfo, execlusiveNodes []int64) *ChannelOpSet {
var ( var (
toCluster Assignments toCluster Assignments
fromCluster Assignments fromCluster Assignments
@ -371,18 +371,21 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC
) )
nodeToAvg := typeutil.NewUniqueSet() nodeToAvg := typeutil.NewUniqueSet()
lo.ForEach(currentCluster, func(info *NodeChannelInfo, _ int) { lo.ForEach(currentCluster, func(info *NodeChannelInfo, _ int) {
if !lo.Contains(execlusiveNodes, info.NodeID) { // Get fromCluster
toCluster = append(toCluster, info) if toAssign == nil && len(info.Channels) > 0 {
nodeToAvg.Insert(info.NodeID)
}
if len(info.Channels) > 0 {
fromCluster = append(fromCluster, info) fromCluster = append(fromCluster, info)
channelNum += len(info.Channels) channelNum += len(info.Channels)
nodeToAvg.Insert(info.NodeID) nodeToAvg.Insert(info.NodeID)
} }
// Get toCluster by filtering out execlusive nodes
if lo.Contains(execlusiveNodes, info.NodeID) || (toAssign != nil && info.NodeID == toAssign.NodeID) {
return
}
toCluster = append(toCluster, info)
nodeToAvg.Insert(info.NodeID)
}) })
// If no datanode alive, do nothing // If no datanode alive, do nothing
@ -391,8 +394,8 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC
} }
// 1. assign unassigned channels first // 1. assign unassigned channels first
if len(unassignedChannels) > 0 { if toAssign != nil && len(toAssign.Channels) > 0 {
chPerNode := (len(unassignedChannels) + channelNum) / nodeToAvg.Len() chPerNode := (len(toAssign.Channels) + channelNum) / nodeToAvg.Len()
// sort by assigned channels count ascsending // sort by assigned channels count ascsending
sort.Slice(toCluster, func(i, j int) bool { sort.Slice(toCluster, func(i, j int) bool {
@ -408,7 +411,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC
} }
updates := make(map[int64][]RWChannel) updates := make(map[int64][]RWChannel)
for i, newChannel := range unassignedChannels { for i, newChannel := range toAssign.GetChannels() {
n := nodesLackOfChannels[i%len(nodesLackOfChannels)].NodeID n := nodesLackOfChannels[i%len(nodesLackOfChannels)].NodeID
updates[n] = append(updates[n], newChannel) updates[n] = append(updates[n], newChannel)
} }
@ -416,12 +419,12 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC
opSet := NewChannelOpSet() opSet := NewChannelOpSet()
for id, chs := range updates { for id, chs := range updates {
opSet.Append(id, Watch, chs...) opSet.Append(id, Watch, chs...)
opSet.Delete(bufferID, chs...) opSet.Delete(toAssign.NodeID, chs...)
} }
log.Info("Assign channels to nodes by channel count", log.Info("Assign channels to nodes by channel count",
zap.Int("channel count", len(unassignedChannels)), zap.Int("toAssign channel count", len(toAssign.Channels)),
zap.Int("cluster count", len(toCluster)), zap.Any("original nodeID", toAssign.NodeID),
zap.Int64s("exclusive nodes", execlusiveNodes), zap.Int64s("exclusive nodes", execlusiveNodes),
zap.Any("operations", opSet), zap.Any("operations", opSet),
zap.Int64s("nodesLackOfChannels", lo.Map(nodesLackOfChannels, func(info *NodeChannelInfo, _ int) int64 { zap.Int64s("nodesLackOfChannels", lo.Map(nodesLackOfChannels, func(info *NodeChannelInfo, _ int) int64 {

View File

@ -519,9 +519,7 @@ func (s *AssignByCountPolicySuite) TestWithoutUnassignedChannels() {
func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
s.Run("one unassigned channel", func() { s.Run("one unassigned channel", func() {
unassigned := []RWChannel{ unassigned := NewNodeChannelInfo(bufferID, getChannel("new-ch-1", 1))
getChannel("new-ch-1", 1),
}
opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil)
s.NotNil(opSet) s.NotNil(opSet)
@ -537,11 +535,11 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
}) })
s.Run("three unassigned channel", func() { s.Run("three unassigned channel", func() {
unassigned := []RWChannel{ unassigned := NewNodeChannelInfo(bufferID,
getChannel("new-ch-1", 1), getChannel("new-ch-1", 1),
getChannel("new-ch-2", 1), getChannel("new-ch-2", 1),
getChannel("new-ch-3", 1), getChannel("new-ch-3", 1),
} )
opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil)
s.NotNil(opSet) s.NotNil(opSet)
@ -561,11 +559,11 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
}) })
s.Run("three unassigned channel with execlusiveNodes", func() { s.Run("three unassigned channel with execlusiveNodes", func() {
unassigned := []RWChannel{ unassigned := NewNodeChannelInfo(bufferID,
getChannel("new-ch-1", 1), getChannel("new-ch-1", 1),
getChannel("new-ch-2", 1), getChannel("new-ch-2", 1),
getChannel("new-ch-3", 1), getChannel("new-ch-3", 1),
} )
opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, []int64{1, 2}) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, []int64{1, 2})
s.NotNil(opSet) s.NotNil(opSet)
@ -584,13 +582,13 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
s.ElementsMatch([]int64{3}, nodeIDs) s.ElementsMatch([]int64{3}, nodeIDs)
}) })
s.Run("67 unassigned with 33 in node1, none in node2,3", func() { s.Run("67 unassigned with 33 in node1, none in node2,3", func() {
var unassigned []RWChannel var unassignedChannels []RWChannel
m1 := make(map[string]int64) m1 := make(map[string]int64)
for i := 0; i < 33; i++ { for i := 0; i < 33; i++ {
m1[fmt.Sprintf("ch-%d", i)] = 1 m1[fmt.Sprintf("ch-%d", i)] = 1
} }
for i := 33; i < 100; i++ { for i := 33; i < 100; i++ {
unassigned = append(unassigned, getChannel(fmt.Sprintf("ch-%d", i), 1)) unassignedChannels = append(unassignedChannels, getChannel(fmt.Sprintf("ch-%d", i), 1))
} }
s.curCluster = []*NodeChannelInfo{ s.curCluster = []*NodeChannelInfo{
{NodeID: 1, Channels: getChannels(m1)}, {NodeID: 1, Channels: getChannels(m1)},
@ -598,6 +596,7 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
{NodeID: 3, Channels: map[string]RWChannel{}}, {NodeID: 3, Channels: map[string]RWChannel{}},
} }
unassigned := NewNodeChannelInfo(bufferID, unassignedChannels...)
opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil)
s.NotNil(opSet) s.NotNil(opSet)
@ -614,4 +613,30 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
}) })
s.ElementsMatch([]int64{3, 2}, nodeIDs) s.ElementsMatch([]int64{3, 2}, nodeIDs)
}) })
s.Run("toAssign from nodeID = 1", func() {
var unassigned *NodeChannelInfo
for _, info := range s.curCluster {
if info.NodeID == int64(1) {
unassigned = info
}
}
s.Require().NotNil(unassigned)
opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, []int64{1, 2})
s.NotNil(opSet)
s.Equal(3, opSet.GetChannelNumber())
for _, op := range opSet.Collect() {
if op.NodeID == int64(1) {
s.Equal(Delete, op.Type)
}
}
s.Equal(2, opSet.Len())
nodeIDs := lo.FilterMap(opSet.Collect(), func(op *ChannelOp, _ int) (int64, bool) {
return op.NodeID, true
})
s.ElementsMatch([]int64{3, 1}, nodeIDs)
})
} }