package meta import ( "sort" "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // NilReplica is used to represent a nil replica. var NilReplica = newReplica(&querypb.Replica{ ID: -1, }) // Replica is a immutable type for manipulating replica meta info for replica manager. // Performed a copy-on-write strategy to keep the consistency of the replica manager. // So only read only operations are allowed on these type. type Replica struct { replicaPB *querypb.Replica rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. // always keep consistent with replicaPB.Nodes. // mutual exclusive with roNodes. roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field. // always keep consistent with replicaPB.RoNodes. // node used by replica but cannot add more channel or segment ont it. // include rebalance node or node out of resource group. } // Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica { r := proto.Clone(replica).(*querypb.Replica) // TODO: nodes is a bad parameter, break the consistency, should be removed in future. // keep it for old unittest. if len(nodes) > 0 && len(replica.Nodes) == 0 && nodes[0].Len() > 0 { r.Nodes = nodes[0].Collect() } return newReplica(r) } // newReplica creates a new replica from pb. func newReplica(replica *querypb.Replica) *Replica { return &Replica{ replicaPB: proto.Clone(replica).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.Nodes...), roNodes: typeutil.NewUniqueSet(replica.RoNodes...), } } // GetID returns the id of the replica. func (replica *Replica) GetID() typeutil.UniqueID { return replica.replicaPB.GetID() } // GetCollectionID returns the collection id of the replica. func (replica *Replica) GetCollectionID() typeutil.UniqueID { return replica.replicaPB.GetCollectionID() } // GetResourceGroup returns the resource group name of the replica. func (replica *Replica) GetResourceGroup() string { return replica.replicaPB.GetResourceGroup() } // GetNodes returns the rw nodes of the replica. // readonly, don't modify the returned slice. func (replica *Replica) GetNodes() []int64 { nodes := make([]int64, 0) nodes = append(nodes, replica.replicaPB.GetRoNodes()...) nodes = append(nodes, replica.replicaPB.GetNodes()...) return nodes } // GetRONodes returns the ro nodes of the replica. // readonly, don't modify the returned slice. func (replica *Replica) GetRONodes() []int64 { return replica.replicaPB.GetRoNodes() } // GetRONodes returns the rw nodes of the replica. // readonly, don't modify the returned slice. func (replica *Replica) GetRWNodes() []int64 { return replica.replicaPB.GetNodes() } // RangeOverRWNodes iterates over the read and write nodes of the replica. func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) { replica.rwNodes.Range(f) } // RangeOverRONodes iterates over the ro nodes of the replica. func (replica *Replica) RangeOverRONodes(f func(node int64) bool) { replica.roNodes.Range(f) } // RWNodesCount returns the count of rw nodes of the replica. func (replica *Replica) RWNodesCount() int { return replica.rwNodes.Len() } // RONodesCount returns the count of ro nodes of the replica. func (replica *Replica) RONodesCount() int { return replica.roNodes.Len() } // NodesCount returns the count of rw nodes and ro nodes of the replica. func (replica *Replica) NodesCount() int { return replica.rwNodes.Len() + replica.roNodes.Len() } // Contains checks if the node is in rw nodes of the replica. func (replica *Replica) Contains(node int64) bool { return replica.ContainRONode(node) || replica.ContainRWNode(node) } // ContainRONode checks if the node is in ro nodes of the replica. func (replica *Replica) ContainRONode(node int64) bool { return replica.roNodes.Contain(node) } // ContainRONode checks if the node is in ro nodes of the replica. func (replica *Replica) ContainRWNode(node int64) bool { return replica.rwNodes.Contain(node) } // Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead. // TODO: removed in future, only for old unittest now. func (replica *Replica) AddRWNode(nodes ...int64) { replica.roNodes.Remove(nodes...) replica.replicaPB.RoNodes = replica.roNodes.Collect() replica.rwNodes.Insert(nodes...) replica.replicaPB.Nodes = replica.rwNodes.Collect() } func (replica *Replica) GetChannelRWNodes(channelName string) []int64 { channelNodeInfos := replica.replicaPB.GetChannelNodeInfos() if channelNodeInfos[channelName] == nil || len(channelNodeInfos[channelName].GetRwNodes()) == 0 { return nil } return replica.replicaPB.ChannelNodeInfos[channelName].GetRwNodes() } // CopyForWrite returns a mutable replica for write operations. func (replica *Replica) CopyForWrite() *mutableReplica { exclusiveRWNodeToChannel := make(map[int64]string) for name, channelNodeInfo := range replica.replicaPB.GetChannelNodeInfos() { for _, nodeID := range channelNodeInfo.GetRwNodes() { exclusiveRWNodeToChannel[nodeID] = name } } return &mutableReplica{ Replica: &Replica{ replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), }, exclusiveRWNodeToChannel: exclusiveRWNodeToChannel, } } func (replica *Replica) IsChannelExclusiveModeEnabled() bool { return replica.replicaPB.ChannelNodeInfos != nil && len(replica.replicaPB.ChannelNodeInfos) > 0 } // mutableReplica is a mutable type (COW) for manipulating replica meta info for replica manager. type mutableReplica struct { *Replica exclusiveRWNodeToChannel map[int64]string } // SetResourceGroup sets the resource group name of the replica. func (replica *mutableReplica) SetResourceGroup(resourceGroup string) { replica.replicaPB.ResourceGroup = resourceGroup } // AddRWNode adds the node to rw nodes of the replica. func (replica *mutableReplica) AddRWNode(nodes ...int64) { replica.Replica.AddRWNode(nodes...) // try to update node's assignment between channels replica.tryBalanceNodeForChannel() } // AddRONode moves the node from rw nodes to ro nodes of the replica. // only used in replica manager. func (replica *mutableReplica) AddRONode(nodes ...int64) { replica.rwNodes.Remove(nodes...) replica.replicaPB.Nodes = replica.rwNodes.Collect() replica.roNodes.Insert(nodes...) replica.replicaPB.RoNodes = replica.roNodes.Collect() // remove node from channel's exclusive list replica.removeChannelExclusiveNodes(nodes...) // try to update node's assignment between channels replica.tryBalanceNodeForChannel() } // RemoveNode removes the node from rw nodes and ro nodes of the replica. // only used in replica manager. func (replica *mutableReplica) RemoveNode(nodes ...int64) { replica.roNodes.Remove(nodes...) replica.replicaPB.RoNodes = replica.roNodes.Collect() replica.rwNodes.Remove(nodes...) replica.replicaPB.Nodes = replica.rwNodes.Collect() // remove node from channel's exclusive list replica.removeChannelExclusiveNodes(nodes...) // try to update node's assignment between channels replica.tryBalanceNodeForChannel() } func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) { channelNodeMap := make(map[string][]int64) for _, nodeID := range nodes { channelName, ok := replica.exclusiveRWNodeToChannel[nodeID] if ok { if channelNodeMap[channelName] == nil { channelNodeMap[channelName] = make([]int64, 0) } channelNodeMap[channelName] = append(channelNodeMap[channelName], nodeID) } delete(replica.exclusiveRWNodeToChannel, nodeID) } for channelName, nodeIDs := range channelNodeMap { channelNodeInfo, ok := replica.replicaPB.ChannelNodeInfos[channelName] if ok { channelUsedNodes := typeutil.NewUniqueSet() channelUsedNodes.Insert(channelNodeInfo.GetRwNodes()...) channelUsedNodes.Remove(nodeIDs...) replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = channelUsedNodes.Collect() } } } func (replica *mutableReplica) TryEnableChannelExclusiveMode(channelNames ...string) { if replica.replicaPB.ChannelNodeInfos == nil { replica.replicaPB.ChannelNodeInfos = make(map[string]*querypb.ChannelNodeInfo) for _, channelName := range channelNames { replica.replicaPB.ChannelNodeInfos[channelName] = &querypb.ChannelNodeInfo{} } } if replica.exclusiveRWNodeToChannel == nil { replica.exclusiveRWNodeToChannel = make(map[int64]string) } } func (replica *mutableReplica) DisableChannelExclusiveMode() { if replica.replicaPB.ChannelNodeInfos != nil { channelNodeInfos := make(map[string]*querypb.ChannelNodeInfo) for channelName := range replica.replicaPB.ChannelNodeInfos { channelNodeInfos[channelName] = &querypb.ChannelNodeInfo{} } replica.replicaPB.ChannelNodeInfos = channelNodeInfos } replica.exclusiveRWNodeToChannel = make(map[int64]string) } // tryBalanceNodeForChannel attempts to balance nodes across channels using an improved algorithm func (replica *mutableReplica) tryBalanceNodeForChannel() { channelNodeInfos := replica.replicaPB.GetChannelNodeInfos() if len(channelNodeInfos) == 0 { return } // Check if channel exclusive mode should be enabled if !replica.shouldEnableChannelExclusiveMode(channelNodeInfos) { replica.DisableChannelExclusiveMode() return } // Calculate optimal node assignments targetAssignments := replica.calculateOptimalAssignments(channelNodeInfos) // Apply the rebalancing with minimal node movement replica.rebalanceChannelNodes(channelNodeInfos, targetAssignments) } // shouldEnableChannelExclusiveMode determines if channel exclusive mode should be enabled func (replica *mutableReplica) shouldEnableChannelExclusiveMode(channelInfos map[string]*querypb.ChannelNodeInfo) bool { balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue() channelExclusiveFactor := paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.GetAsInt() return balancePolicy == ChannelLevelScoreBalancerName && replica.RWNodesCount() >= len(channelInfos)*channelExclusiveFactor } // calculateOptimalAssignments calculates the optimal node count for each channel func (replica *mutableReplica) calculateOptimalAssignments(channelInfos map[string]*querypb.ChannelNodeInfo) map[string]int { channelCount := len(channelInfos) totalNodes := replica.RWNodesCount() // Get channels sorted by current node count (descending) sortedChannels := replica.getSortedChannelsByNodeCount(channelInfos) // Calculate base assignment: average nodes per channel assignments := make(map[string]int, channelCount) baseNodes := totalNodes / channelCount extraNodes := totalNodes % channelCount // Distribute extra nodes to channels with fewer current nodes first for i, channel := range sortedChannels { nodeCount := baseNodes if i < extraNodes { nodeCount++ } assignments[channel] = nodeCount } return assignments } // getSortedChannelsByNodeCount returns channels sorted by current node count (descending) func (replica *mutableReplica) getSortedChannelsByNodeCount(channelInfos map[string]*querypb.ChannelNodeInfo) []string { // channelNodeAssignment represents a channel's node assignment type channelNodeAssignment struct { name string nodes []int64 } assignments := make([]channelNodeAssignment, 0, len(channelInfos)) for name, channelNodeInfo := range channelInfos { assignments = append(assignments, channelNodeAssignment{ name: name, nodes: channelNodeInfo.GetRwNodes(), }) } // Sort by node count (descending) to prioritize channels with more nodes for reduction sort.Slice(assignments, func(i, j int) bool { return len(assignments[i].nodes) > len(assignments[j].nodes) }) channels := make([]string, len(assignments)) for i, assignment := range assignments { channels[i] = assignment.name } return channels } // rebalanceChannelNodes performs the actual node rebalancing func (replica *mutableReplica) rebalanceChannelNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { // Phase 1: Release excess nodes from over-allocated channels replica.releaseExcessNodes(channelInfos, targetAssignments) // Phase 2: Allocate nodes to under-allocated channels replica.allocateInsufficientNodes(channelInfos, targetAssignments) } // releaseExcessNodes releases nodes from channels that have more than their target allocation func (replica *mutableReplica) releaseExcessNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { for channelName, channelNodeInfo := range channelInfos { currentNodes := channelNodeInfo.GetRwNodes() targetCount := targetAssignments[channelName] if len(currentNodes) > targetCount { // Keep the first targetCount nodes, release the rest replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes[:targetCount] // Remove released nodes from the exclusive mapping for _, nodeID := range currentNodes[targetCount:] { delete(replica.exclusiveRWNodeToChannel, nodeID) } } } } // allocateInsufficientNodes allocates nodes to channels that need more nodes func (replica *mutableReplica) allocateInsufficientNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { // Get available nodes (not exclusively assigned to any channel) availableNodes := replica.getAvailableNodes() for channelName, channelNodeInfo := range channelInfos { currentNodes := channelNodeInfo.GetRwNodes() targetCount := targetAssignments[channelName] if len(currentNodes) < targetCount { neededCount := targetCount - len(currentNodes) allocatedNodes := replica.allocateNodesFromPool(availableNodes, neededCount, channelName) // Update channel's node list updatedNodes := make([]int64, 0, len(currentNodes)+len(allocatedNodes)) updatedNodes = append(updatedNodes, currentNodes...) updatedNodes = append(updatedNodes, allocatedNodes...) replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = updatedNodes } log.Info("channel exclusive node list", zap.String("channelName", channelName), zap.Int64s("nodes", replica.replicaPB.ChannelNodeInfos[channelName].RwNodes)) } } // getAvailableNodes returns nodes that are not exclusively assigned to any channel func (replica *mutableReplica) getAvailableNodes() []int64 { allNodes := replica.rwNodes.Collect() availableNodes := make([]int64, 0, len(allNodes)) for _, nodeID := range allNodes { if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive { availableNodes = append(availableNodes, nodeID) } } return availableNodes } // allocateNodesFromPool allocates nodes from the available pool to a channel func (replica *mutableReplica) allocateNodesFromPool(availableNodes []int64, neededCount int, channelName string) []int64 { allocatedCount := 0 allocatedNodes := make([]int64, 0, neededCount) for _, nodeID := range availableNodes { if allocatedCount >= neededCount { break } // Check if node is still available (not assigned since we got the list) if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive { allocatedNodes = append(allocatedNodes, nodeID) replica.exclusiveRWNodeToChannel[nodeID] = channelName allocatedCount++ } } return allocatedNodes } // IntoReplica returns the immutable replica, After calling this method, the mutable replica should not be used again. func (replica *mutableReplica) IntoReplica() *Replica { r := replica.Replica replica.Replica = nil return r }