milvus/internal/datacoord/policy.go

360 lines
12 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"sort"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// filterNode filters out node-channel info where node ID == `nodeID`.
func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
filtered := make([]*NodeChannelInfo, 0)
for _, info := range infos {
if info.NodeID == nodeID {
continue
}
filtered = append(filtered, info)
}
return filtered
}
type Assignments []*NodeChannelInfo
func (a Assignments) GetChannelCount(nodeID int64) int {
for _, info := range a {
if info.NodeID == nodeID {
return len(info.Channels)
}
}
return 0
}
func (a Assignments) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, nChannelInfo := range a {
enc.AppendString("nodeID:")
enc.AppendInt64(nChannelInfo.NodeID)
cstr := "["
if len(nChannelInfo.Channels) > 0 {
for _, s := range nChannelInfo.Channels {
cstr += s.GetName()
cstr += ", "
}
cstr = cstr[:len(cstr)-2]
}
cstr += "]"
enc.AppendString(cstr)
}
return nil
}
// BalanceChannelPolicy try to balance watched channels to registered nodes
type BalanceChannelPolicy func(cluster Assignments) *ChannelOpSet
// EmptyBalancePolicy is a dummy balance policy
func EmptyBalancePolicy(cluster Assignments) *ChannelOpSet {
return nil
}
// AvgBalanceChannelPolicy tries to balance channel evenly
func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet {
avaNodeNum := len(cluster)
if avaNodeNum == 0 {
return nil
}
reAllocations := make(Assignments, 0, avaNodeNum)
totalChannelNum := 0
for _, nodeChs := range cluster {
totalChannelNum += len(nodeChs.Channels)
}
channelCountPerNode := totalChannelNum / avaNodeNum
maxChannelCountPerNode := channelCountPerNode
remainder := totalChannelNum % avaNodeNum
if remainder > 0 {
maxChannelCountPerNode += 1
}
for _, nChannels := range cluster {
chCount := len(nChannels.Channels)
if chCount == 0 {
continue
}
toReleaseCount := chCount - channelCountPerNode
if remainder > 0 && chCount >= maxChannelCountPerNode {
remainder -= 1
toReleaseCount = chCount - maxChannelCountPerNode
}
if toReleaseCount == 0 {
log.Info("node channel count is not much larger than average, skip reallocate",
zap.Int64("nodeID", nChannels.NodeID),
zap.Int("channelCount", chCount),
zap.Int("channelCountPerNode", channelCountPerNode))
continue
}
reallocate := NewNodeChannelInfo(nChannels.NodeID)
for _, ch := range nChannels.Channels {
reallocate.AddChannel(ch)
toReleaseCount--
if toReleaseCount <= 0 {
break
}
}
reAllocations = append(reAllocations, reallocate)
}
if len(reAllocations) == 0 {
return nil
}
opSet := NewChannelOpSet()
for _, reAlloc := range reAllocations {
opSet.Append(reAlloc.NodeID, Release, lo.Values(reAlloc.Channels)...)
}
return opSet
}
// Assign policy assigns channels to nodes.
// CurrentCluster refers to the current distributions
// ToAssign refers to the target channels needed to be reassigned
//
// if provided, this policy will only assign these channels
// if empty, this policy will balance the currentCluster
//
// ExclusiveNodes means donot assign channels to these nodes.
type AssignPolicy func(currentCluster Assignments, toAssign *NodeChannelInfo, exclusiveNodes []int64) *ChannelOpSet
func EmptyAssignPolicy(currentCluster Assignments, toAssign *NodeChannelInfo, execlusiveNodes []int64) *ChannelOpSet {
return nil
}
// AvgAssignByCountPolicy balances channel distribution across nodes based on count
func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInfo, exclusiveNodes []int64) *ChannelOpSet {
var (
availableNodes Assignments // Nodes that can receive channels
sourceNodes Assignments // Nodes that can provide channels
totalChannelCount int // Total number of channels in the cluster
)
// Create a set to track unique node IDs for average calculation
uniqueNodeIDs := typeutil.NewUniqueSet()
// Iterate through each node in the current cluster
lo.ForEach(currentCluster, func(nodeInfo *NodeChannelInfo, _ int) {
// If we're balancing existing channels (not assigning new ones) and this node has channels
if toAssign == nil && len(nodeInfo.Channels) > 0 {
sourceNodes = append(sourceNodes, nodeInfo) // Add to source nodes
totalChannelCount += len(nodeInfo.Channels) // Count its channels
uniqueNodeIDs.Insert(nodeInfo.NodeID) // Track this node for average calculation
return
}
// Skip nodes that are in the exclusive list or the node we're reassigning from
if lo.Contains(exclusiveNodes, nodeInfo.NodeID) || (toAssign != nil && nodeInfo.NodeID == toAssign.NodeID) {
return
}
// This node can receive channels
availableNodes = append(availableNodes, nodeInfo) // Add to target nodes
totalChannelCount += len(nodeInfo.Channels) // Count its channels
uniqueNodeIDs.Insert(nodeInfo.NodeID) // Track this node for average calculation
})
// If no nodes are available to receive channels, do nothing
if len(availableNodes) == 0 {
log.Info("No available nodes to receive channels")
return nil
}
// CASE 1: Assign unassigned channels to nodes
if toAssign != nil && len(toAssign.Channels) > 0 {
return assignNewChannels(availableNodes, toAssign, uniqueNodeIDs.Len(), totalChannelCount, exclusiveNodes)
}
// Check if auto-balancing is enabled
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
log.Info("Auto balance disabled")
return nil
}
// CASE 2: Balance existing channels across nodes
if len(sourceNodes) == 0 {
log.Info("No source nodes to rebalance from")
return nil
}
return balanceExistingChannels(currentCluster, sourceNodes, uniqueNodeIDs.Len(), totalChannelCount, exclusiveNodes)
}
// assignNewChannels handles assigning new channels to available nodes
func assignNewChannels(availableNodes Assignments, toAssign *NodeChannelInfo, nodeCount int, totalChannelCount int, exclusiveNodes []int64) *ChannelOpSet {
// Calculate total channels after assignment
totalChannelsAfterAssignment := totalChannelCount + len(toAssign.Channels)
// Calculate ideal distribution (channels per node)
baseChannelsPerNode := totalChannelsAfterAssignment / nodeCount
extraChannels := totalChannelsAfterAssignment % nodeCount
// Create a map to track target channel count for each node
targetChannelCounts := make(map[int64]int)
for _, nodeInfo := range availableNodes {
targetChannelCounts[nodeInfo.NodeID] = baseChannelsPerNode
if extraChannels > 0 {
targetChannelCounts[nodeInfo.NodeID]++ // Distribute remainder one by one
extraChannels--
}
}
// Track which channels will be assigned to which nodes
nodeAssignments := make(map[int64][]RWChannel)
// Create a working copy of available nodes that we can sort
sortedNodes := make([]*NodeChannelInfo, len(availableNodes))
copy(sortedNodes, availableNodes)
// Assign channels to nodes, prioritizing nodes with fewer channels
for _, channel := range toAssign.GetChannels() {
// Sort nodes by their current load (existing + newly assigned channels)
sort.Slice(sortedNodes, func(i, j int) bool {
// Compare total channels (existing + newly assigned)
iTotal := len(sortedNodes[i].Channels) + len(nodeAssignments[sortedNodes[i].NodeID])
jTotal := len(sortedNodes[j].Channels) + len(nodeAssignments[sortedNodes[j].NodeID])
return iTotal < jTotal
})
// Find the best node to assign to (the one with fewest channels)
bestNode := sortedNodes[0]
// Try to find a node that's below its target count
for _, node := range sortedNodes {
currentTotal := len(node.Channels) + len(nodeAssignments[node.NodeID])
if currentTotal < targetChannelCounts[node.NodeID] {
bestNode = node
break
}
}
// Assign the channel to the selected node
nodeAssignments[bestNode.NodeID] = append(nodeAssignments[bestNode.NodeID], channel)
}
// Create operations to watch channels on new nodes and delete from original node
operations := NewChannelOpSet()
for nodeID, channels := range nodeAssignments {
operations.Append(nodeID, Watch, channels...) // New node watches channels
operations.Delete(toAssign.NodeID, channels...) // Remove channels from original node
}
// Log the assignment operations
log.Info("Assign channels to nodes by channel count",
zap.Int("toAssign channel count", len(toAssign.Channels)),
zap.Any("original nodeID", toAssign.NodeID),
zap.Int64s("exclusive nodes", exclusiveNodes),
zap.Any("operations", operations),
zap.Any("target distribution", targetChannelCounts),
)
return operations
}
// balanceExistingChannels handles rebalancing existing channels across nodes
func balanceExistingChannels(currentCluster Assignments, sourceNodes Assignments, nodeCount int, totalChannelCount int, exclusiveNodes []int64) *ChannelOpSet {
// Calculate ideal distribution
baseChannelsPerNode := totalChannelCount / nodeCount
extraChannels := totalChannelCount % nodeCount
// If there are too few channels to distribute, do nothing
if baseChannelsPerNode == 0 {
log.Info("Too few channels to distribute meaningfully")
return nil
}
// Create a map to track target channel count for each node
targetChannelCounts := make(map[int64]int)
for _, nodeInfo := range currentCluster {
if !lo.Contains(exclusiveNodes, nodeInfo.NodeID) {
targetChannelCounts[nodeInfo.NodeID] = baseChannelsPerNode
if extraChannels > 0 {
targetChannelCounts[nodeInfo.NodeID]++ // Distribute remainder one by one
extraChannels--
}
}
}
// Sort nodes by channel count (descending) to take from nodes with most channels
sort.Slice(sourceNodes, func(i, j int) bool {
return len(sourceNodes[i].Channels) > len(sourceNodes[j].Channels)
})
// Track which channels will be released from which nodes
channelsToRelease := make(map[int64][]RWChannel)
// First handle exclusive nodes - we need to remove all channels from them
for _, nodeInfo := range sourceNodes {
if lo.Contains(exclusiveNodes, nodeInfo.NodeID) {
channelsToRelease[nodeInfo.NodeID] = lo.Values(nodeInfo.Channels)
continue
}
// For regular nodes, only release if they have more than their target
targetCount := targetChannelCounts[nodeInfo.NodeID]
currentCount := len(nodeInfo.Channels)
if currentCount > targetCount {
// Calculate how many channels to release
excessCount := currentCount - targetCount
// Get the channels to release (we'll take the last ones)
channels := lo.Values(nodeInfo.Channels)
channelsToRelease[nodeInfo.NodeID] = channels[len(channels)-excessCount:]
}
}
// Create operations to release channels from overloaded nodes
operations := NewChannelOpSet()
for nodeID, channels := range channelsToRelease {
if len(channels) == 0 {
continue
}
if lo.Contains(exclusiveNodes, nodeID) {
operations.Append(nodeID, Delete, channels...) // Delete channels from exclusive nodes
operations.Append(bufferID, Watch, channels...) // Move to buffer temporarily
} else {
operations.Append(nodeID, Release, channels...) // Release channels from regular nodes
}
}
// Log the balancing operations
log.Info("Balance channels across nodes",
zap.Int64s("exclusive nodes", exclusiveNodes),
zap.Int("total channel count", totalChannelCount),
zap.Int("target channels per node", baseChannelsPerNode),
zap.Any("target distribution", targetChannelCounts),
zap.Any("operations", operations),
)
return operations
}