From 40e5c0be708cbfd11725e4736f6068100c35a838 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 14 May 2024 11:51:32 +0800 Subject: [PATCH] fix: Enable to balance released standby channel (#32986) See also: #32879 --------- Signed-off-by: yangxuan --- internal/datacoord/channel_manager_v2.go | 6 ++-- internal/datacoord/policy.go | 31 +++++++++-------- internal/datacoord/policy_test.go | 43 +++++++++++++++++++----- 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 4b69161741..2d63596f1f 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -181,7 +181,7 @@ func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error { log.Info("register node", zap.Int64("registered node", nodeID)) m.store.AddNode(nodeID) - updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo().GetChannels(), m.legacyNodes.Collect()) + updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect()) if updates == nil { log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) @@ -234,7 +234,7 @@ func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error { // channel already written into meta, try to assign it to the cluster // not error is returned if failed, the assignment will retry later - updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), []RWChannel{ch}, m.legacyNodes.Collect()) + updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect()) if updates == nil { return nil } @@ -284,7 +284,7 @@ func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error { m.mu.Lock() defer m.mu.Unlock() - updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original.GetChannels(), m.legacyNodes.Collect()) + updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect()) if updates != nil { return m.execute(updates) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 6d6077dd82..bdb32a545e 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -363,7 +363,7 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet { return opSet } -func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWChannel, execlusiveNodes []int64) *ChannelOpSet { +func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInfo, execlusiveNodes []int64) *ChannelOpSet { var ( toCluster Assignments fromCluster Assignments @@ -371,18 +371,21 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC ) nodeToAvg := typeutil.NewUniqueSet() - lo.ForEach(currentCluster, func(info *NodeChannelInfo, _ int) { - if !lo.Contains(execlusiveNodes, info.NodeID) { - toCluster = append(toCluster, info) - nodeToAvg.Insert(info.NodeID) - } - - if len(info.Channels) > 0 { + // Get fromCluster + if toAssign == nil && len(info.Channels) > 0 { fromCluster = append(fromCluster, info) channelNum += len(info.Channels) nodeToAvg.Insert(info.NodeID) } + + // Get toCluster by filtering out execlusive nodes + if lo.Contains(execlusiveNodes, info.NodeID) || (toAssign != nil && info.NodeID == toAssign.NodeID) { + return + } + + toCluster = append(toCluster, info) + nodeToAvg.Insert(info.NodeID) }) // If no datanode alive, do nothing @@ -391,8 +394,8 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC } // 1. assign unassigned channels first - if len(unassignedChannels) > 0 { - chPerNode := (len(unassignedChannels) + channelNum) / nodeToAvg.Len() + if toAssign != nil && len(toAssign.Channels) > 0 { + chPerNode := (len(toAssign.Channels) + channelNum) / nodeToAvg.Len() // sort by assigned channels count ascsending sort.Slice(toCluster, func(i, j int) bool { @@ -408,7 +411,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC } updates := make(map[int64][]RWChannel) - for i, newChannel := range unassignedChannels { + for i, newChannel := range toAssign.GetChannels() { n := nodesLackOfChannels[i%len(nodesLackOfChannels)].NodeID updates[n] = append(updates[n], newChannel) } @@ -416,12 +419,12 @@ func AvgAssignByCountPolicy(currentCluster Assignments, unassignedChannels []RWC opSet := NewChannelOpSet() for id, chs := range updates { opSet.Append(id, Watch, chs...) - opSet.Delete(bufferID, chs...) + opSet.Delete(toAssign.NodeID, chs...) } log.Info("Assign channels to nodes by channel count", - zap.Int("channel count", len(unassignedChannels)), - zap.Int("cluster count", len(toCluster)), + zap.Int("toAssign channel count", len(toAssign.Channels)), + zap.Any("original nodeID", toAssign.NodeID), zap.Int64s("exclusive nodes", execlusiveNodes), zap.Any("operations", opSet), zap.Int64s("nodesLackOfChannels", lo.Map(nodesLackOfChannels, func(info *NodeChannelInfo, _ int) int64 { diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 85fa80b62b..5d9d254d5f 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -519,9 +519,7 @@ func (s *AssignByCountPolicySuite) TestWithoutUnassignedChannels() { func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { s.Run("one unassigned channel", func() { - unassigned := []RWChannel{ - getChannel("new-ch-1", 1), - } + unassigned := NewNodeChannelInfo(bufferID, getChannel("new-ch-1", 1)) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) s.NotNil(opSet) @@ -537,11 +535,11 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { }) s.Run("three unassigned channel", func() { - unassigned := []RWChannel{ + unassigned := NewNodeChannelInfo(bufferID, getChannel("new-ch-1", 1), getChannel("new-ch-2", 1), getChannel("new-ch-3", 1), - } + ) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) s.NotNil(opSet) @@ -561,11 +559,11 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { }) s.Run("three unassigned channel with execlusiveNodes", func() { - unassigned := []RWChannel{ + unassigned := NewNodeChannelInfo(bufferID, getChannel("new-ch-1", 1), getChannel("new-ch-2", 1), getChannel("new-ch-3", 1), - } + ) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, []int64{1, 2}) s.NotNil(opSet) @@ -584,13 +582,13 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { s.ElementsMatch([]int64{3}, nodeIDs) }) s.Run("67 unassigned with 33 in node1, none in node2,3", func() { - var unassigned []RWChannel + var unassignedChannels []RWChannel m1 := make(map[string]int64) for i := 0; i < 33; i++ { m1[fmt.Sprintf("ch-%d", i)] = 1 } for i := 33; i < 100; i++ { - unassigned = append(unassigned, getChannel(fmt.Sprintf("ch-%d", i), 1)) + unassignedChannels = append(unassignedChannels, getChannel(fmt.Sprintf("ch-%d", i), 1)) } s.curCluster = []*NodeChannelInfo{ {NodeID: 1, Channels: getChannels(m1)}, @@ -598,6 +596,7 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { {NodeID: 3, Channels: map[string]RWChannel{}}, } + unassigned := NewNodeChannelInfo(bufferID, unassignedChannels...) opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, nil) s.NotNil(opSet) @@ -614,4 +613,30 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { }) s.ElementsMatch([]int64{3, 2}, nodeIDs) }) + + s.Run("toAssign from nodeID = 1", func() { + var unassigned *NodeChannelInfo + for _, info := range s.curCluster { + if info.NodeID == int64(1) { + unassigned = info + } + } + s.Require().NotNil(unassigned) + + opSet := AvgAssignByCountPolicy(s.curCluster, unassigned, []int64{1, 2}) + s.NotNil(opSet) + + s.Equal(3, opSet.GetChannelNumber()) + for _, op := range opSet.Collect() { + if op.NodeID == int64(1) { + s.Equal(Delete, op.Type) + } + } + s.Equal(2, opSet.Len()) + + nodeIDs := lo.FilterMap(opSet.Collect(), func(op *ChannelOp, _ int) (int64, bool) { + return op.NodeID, true + }) + s.ElementsMatch([]int64{3, 1}, nodeIDs) + }) }