enhance: Enable node assign policy on resource group (#36968)

issue: #36977
with node_label_filter on resource group, user can add label on
querynode with env `MILVUS_COMPONENT_LABEL`, then resource group will
prefer to accept node which match it's node_label_filter.

then querynode's can't be group by labels, and put querynodes with same
label to same resource groups.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/37570/head
wei liu 2024-11-08 11:18:27 +08:00 committed by GitHub
parent bcb6420540
commit a03157838b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 742 additions and 138 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -30,23 +31,25 @@ func newResourceGroupConfig(request int32, limit int32) *rgpb.ResourceGroupConfi
}
type ResourceGroup struct {
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
nodeMgr *session.NodeManager
}
// NewResourceGroup create resource group.
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup {
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig, nodeMgr *session.NodeManager) *ResourceGroup {
rg := &ResourceGroup{
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
nodeMgr: nodeMgr,
}
return rg
}
// NewResourceGroupFromMeta create resource group from meta.
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup, nodeMgr *session.NodeManager) *ResourceGroup {
// Backward compatibility, recover the config from capacity.
if meta.Config == nil {
// If meta.Config is nil, which means the meta is from old version.
@ -57,7 +60,7 @@ func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
meta.Config = newResourceGroupConfig(meta.Capacity, meta.Capacity)
}
}
rg := NewResourceGroup(meta.Name, meta.Config)
rg := NewResourceGroup(meta.Name, meta.Config, nodeMgr)
for _, node := range meta.GetNodes() {
rg.nodes.Insert(node)
}
@ -91,14 +94,27 @@ func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig {
return proto.Clone(rg.cfg).(*rgpb.ResourceGroupConfig)
}
// GetNodes return nodes of resource group.
// GetNodes return nodes of resource group which match required node labels
func (rg *ResourceGroup) GetNodes() []int64 {
return rg.nodes.Collect()
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return rg.nodes.Collect()
}
ret := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if rg.AcceptNode(nodeID) {
ret = append(ret, nodeID)
}
return true
})
return ret
}
// NodeNum return node count of resource group.
// NodeNum return node count of resource group which match required node labels
func (rg *ResourceGroup) NodeNum() int {
return rg.nodes.Len()
return len(rg.GetNodes())
}
// ContainNode return whether resource group contain node.
@ -106,40 +122,104 @@ func (rg *ResourceGroup) ContainNode(id int64) bool {
return rg.nodes.Contain(id)
}
// OversizedNumOfNodes return oversized nodes count. `len(node) - requests`
// OversizedNumOfNodes return oversized nodes count. `NodeNum - requests`
func (rg *ResourceGroup) OversizedNumOfNodes() int {
oversized := rg.nodes.Len() - int(rg.cfg.Requests.NodeNum)
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized < 0 {
return 0
oversized = 0
}
return oversized
return oversized + len(rg.getDirtyNode())
}
// MissingNumOfNodes return lack nodes count. `requests - len(node)`
// MissingNumOfNodes return lack nodes count. `requests - NodeNum`
func (rg *ResourceGroup) MissingNumOfNodes() int {
missing := int(rg.cfg.Requests.NodeNum) - len(rg.nodes)
missing := int(rg.cfg.Requests.NodeNum) - rg.NodeNum()
if missing < 0 {
return 0
}
return missing
}
// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`
// ReachLimitNumOfNodes return reach limit nodes count. `limits - NodeNum`
func (rg *ResourceGroup) ReachLimitNumOfNodes() int {
reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes)
reachLimit := int(rg.cfg.Limits.NodeNum) - rg.NodeNum()
if reachLimit < 0 {
return 0
}
return reachLimit
}
// RedundantOfNodes return redundant nodes count. `len(node) - limits`
// RedundantOfNodes return redundant nodes count. `len(node) - limits` or len(dirty_nodes)
func (rg *ResourceGroup) RedundantNumOfNodes() int {
redundant := len(rg.nodes) - int(rg.cfg.Limits.NodeNum)
redundant := rg.NodeNum() - int(rg.cfg.Limits.NodeNum)
if redundant < 0 {
return 0
redundant = 0
}
return redundant
return redundant + len(rg.getDirtyNode())
}
func (rg *ResourceGroup) getDirtyNode() []int64 {
dirtyNodes := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if !rg.AcceptNode(nodeID) {
dirtyNodes = append(dirtyNodes, nodeID)
}
return true
})
return dirtyNodes
}
func (rg *ResourceGroup) SelectNodeForRG(targetRG *ResourceGroup) int64 {
// try to move out dirty node
for _, node := range rg.getDirtyNode() {
if targetRG.AcceptNode(node) {
return node
}
}
// try to move out oversized node
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized > 0 {
for _, node := range rg.GetNodes() {
if targetRG.AcceptNode(node) {
return node
}
}
}
return -1
}
// return node and priority.
func (rg *ResourceGroup) AcceptNode(nodeID int64) bool {
if rg.GetName() == DefaultResourceGroupName {
return true
}
nodeInfo := rg.nodeMgr.Get(nodeID)
if nodeInfo == nil {
return false
}
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return true
}
nodeLabels := nodeInfo.Labels()
if len(nodeLabels) == 0 {
return false
}
for _, labelPair := range requiredNodeLabels {
valueInNode, ok := nodeLabels[labelPair.Key]
if !ok || valueInNode != labelPair.Value {
return false
}
}
return true
}
// HasFrom return whether given resource group is in `from` of rg.
@ -176,9 +256,10 @@ func (rg *ResourceGroup) GetMeta() *querypb.ResourceGroup {
// Snapshot return a snapshot of resource group.
func (rg *ResourceGroup) Snapshot() *ResourceGroup {
return &ResourceGroup{
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
nodeMgr: rg.nodeMgr,
}
}
@ -186,18 +267,18 @@ func (rg *ResourceGroup) Snapshot() *ResourceGroup {
// Return error with reason if not meet requirement.
func (rg *ResourceGroup) MeetRequirement() error {
// if len(node) is less than requests, new node need to be assigned.
if rg.nodes.Len() < int(rg.cfg.Requests.NodeNum) {
if rg.MissingNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, less than request %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}
// if len(node) is greater than limits, node need to be removed.
if rg.nodes.Len() > int(rg.cfg.Limits.NodeNum) {
if rg.RedundantNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, greater than limit %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}

View File

@ -5,8 +5,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestResourceGroup(t *testing.T) {
@ -24,7 +27,10 @@ func TestResourceGroup(t *testing.T) {
ResourceGroup: "rg3",
}},
}
rg := NewResourceGroup("rg1", cfg)
nodeMgr := session.NewNodeManager()
rg := NewResourceGroup("rg1", cfg, nodeMgr)
cfg2 := rg.GetConfig()
assert.Equal(t, cfg.Requests.NodeNum, cfg2.Requests.NodeNum)
@ -84,6 +90,9 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
// Test AddNode
mrg = rg.CopyForWrite()
mrg.AssignNode(1)
@ -108,6 +117,9 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
// Test AddNode until meet requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(2)
@ -132,6 +144,12 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
}))
// Test AddNode until exceed requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(3)
@ -202,12 +220,21 @@ func TestResourceGroup(t *testing.T) {
}
func TestResourceGroupMeta(t *testing.T) {
nodeMgr := session.NewNodeManager()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
rgMeta := &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
Nodes: []int64{1, 2},
}
rg := NewResourceGroupFromMeta(rgMeta)
rg := NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -225,6 +252,9 @@ func TestResourceGroupMeta(t *testing.T) {
assert.False(t, rg.ContainNode(4))
assert.Error(t, rg.MeetRequirement())
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
}))
rgMeta = &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
@ -244,7 +274,7 @@ func TestResourceGroupMeta(t *testing.T) {
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2, 4}, rg.GetNodes())
assert.Equal(t, 3, rg.NodeNum())
@ -271,7 +301,7 @@ func TestResourceGroupMeta(t *testing.T) {
Capacity: defaultResourceGroupCapacity,
Nodes: []int64{1, 2},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -311,7 +341,7 @@ func TestResourceGroupMeta(t *testing.T) {
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -332,3 +362,92 @@ func TestResourceGroupMeta(t *testing.T) {
newMeta = rg.GetMeta()
assert.Equal(t, int32(1000000), newMeta.Capacity)
}
func TestRGNodeFilter(t *testing.T) {
nodeMgr := session.NewNodeManager()
rg := NewResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc1",
},
},
},
}, nodeMgr)
rg.nodes = typeutil.NewSet[int64](1, 2, 3)
nodeInfo1 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Labels: map[string]string{
"dc_name": "dc1",
},
})
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Labels: map[string]string{
"dc_name": "dc1",
},
})
nodeInfo3 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Labels: map[string]string{
"dc_name": "dc2",
},
})
nodeMgr.Add(nodeInfo1)
nodeMgr.Add(nodeInfo2)
nodeMgr.Add(nodeInfo3)
assert.True(t, rg.AcceptNode(1))
assert.True(t, rg.AcceptNode(2))
assert.False(t, rg.AcceptNode(3))
assert.Error(t, rg.MeetRequirement())
assert.Equal(t, rg.NodeNum(), 2)
assert.Len(t, rg.GetNodes(), 2)
rg2 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc2",
},
},
},
}, nodeMgr)
assert.Equal(t, rg.SelectNodeForRG(rg2), int64(3))
rg3 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc1",
},
},
},
}, nodeMgr)
assert.Equal(t, rg.SelectNodeForRG(rg3), int64(-1))
}

View File

@ -62,7 +62,7 @@ type ResourceManager struct {
func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager {
groups := make(map[string]*ResourceGroup)
// Always create a default resource group to keep compatibility.
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity))
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity), nodeMgr)
return &ResourceManager{
incomingNode: typeutil.NewUniqueSet(),
groups: groups,
@ -91,7 +91,7 @@ func (rm *ResourceManager) Recover() error {
for _, meta := range rgs {
needUpgrade := meta.Config == nil
rg := NewResourceGroupFromMeta(meta)
rg := NewResourceGroupFromMeta(meta, rm.nodeMgr)
rm.groups[rg.GetName()] = rg
for _, node := range rg.GetNodes() {
if _, ok := rm.nodeIDMap[node]; ok {
@ -147,7 +147,7 @@ func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGro
return err
}
rg := NewResourceGroup(rgName, cfg)
rg := NewResourceGroup(rgName, cfg, rm.nodeMgr)
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
log.Warn("failed to add resource group",
zap.String("rgName", rgName),
@ -550,135 +550,157 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error {
// recoverMissingNodeRG recover resource group by transfer node from other resource group.
func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error {
for rm.groups[rgName].MissingNumOfNodes() > 0 {
rg := rm.groups[rgName]
sourceRG := rm.selectMissingRecoverSourceRG(rg)
targetRG := rm.groups[rgName]
node, sourceRG := rm.selectNodeForMissingRecover(targetRG)
if sourceRG == nil {
log.Warn("fail to select source resource group", zap.String("rgName", rg.GetName()))
log.Warn("fail to select source resource group", zap.String("rgName", targetRG.GetName()))
return ErrNodeNotEnough
}
nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, rg)
err := rm.transferNode(targetRG.GetName(), node)
if err != nil {
log.Warn("failed to recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
zap.Error(err))
return err
}
log.Info("recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.Int64("nodeID", nodeID),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
)
}
return nil
}
// selectMissingRecoverSourceRG select source resource group for recover missing resource group.
func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node from most redundant resource group first. `len(nodes) > limits`
if redundantRG := rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
return sourceRG.RedundantNumOfNodes()
},
); redundantRG != nil {
return redundantRG
// selectNodeForMissingRecover selects a node for missing recovery.
// It takes a target ResourceGroup and returns the selected node's ID and the source ResourceGroup with highest priority.
func (rm *ResourceManager) selectNodeForMissingRecover(targetRG *ResourceGroup) (int64, *ResourceGroup) {
computeRGPriority := func(rg *ResourceGroup) int {
// If the ResourceGroup has redundant nodes, boost it's priority its priority 1000,000.
if rg.RedundantNumOfNodes() > 0 {
return rg.RedundantNumOfNodes() * 1000000
}
// If the target ResourceGroup has a 'from' relationship with the current ResourceGroup,
// boost it's priority its priority 100,000.
if targetRG.HasFrom(rg.GetName()) {
return rg.OversizedNumOfNodes() * 100000
}
return rg.OversizedNumOfNodes()
}
// Second, Transfer node from most oversized resource group. `len(nodes) > requests`
// `TransferFrom` configured resource group at high priority.
return rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
if rg.HasFrom(sourceRG.GetName()) {
// give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select.
return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost
maxPriority := 0
var sourceRG *ResourceGroup
candidateNode := int64(-1)
for _, rg := range rm.groups {
if rg.GetName() == targetRG.GetName() {
continue
}
if rg.OversizedNumOfNodes() <= 0 {
continue
}
priority := computeRGPriority(rg)
if priority > maxPriority {
// Select a node from the current resource group that is preferred to be removed and assigned to the target resource group.
node := rg.SelectNodeForRG(targetRG)
// If no such node is found, skip the current resource group.
if node == -1 {
continue
}
return sourceRG.OversizedNumOfNodes()
})
sourceRG = rg
candidateNode = node
maxPriority = priority
}
}
return candidateNode, sourceRG
}
// recoverRedundantNodeRG recover resource group by transfer node to other resource group.
func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error {
for rm.groups[rgName].RedundantNumOfNodes() > 0 {
rg := rm.groups[rgName]
targetRG := rm.selectRedundantRecoverTargetRG(rg)
if targetRG == nil {
sourceRG := rm.groups[rgName]
node, targetRG := rm.selectNodeForRedundantRecover(sourceRG)
if node == -1 {
log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.",
zap.String("rgName", rg.GetName()))
zap.String("rgName", sourceRG.GetName()))
return errors.New("all resource group reach limits")
}
nodeID, err := rm.transferOneNodeFromRGToRG(rg, targetRG)
if err != nil {
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
log.Warn("failed to recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
zap.Error(err))
return err
}
log.Info("recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", nodeID),
zap.Int64("nodeID", node),
)
}
return nil
}
// selectRedundantRecoverTargetRG select target resource group for recover redundant resource group.
func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node to most missing resource group first.
if missingRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
return targetRG.MissingNumOfNodes()
},
); missingRG != nil {
return missingRG
// selectNodeForRedundantRecover selects a node for redundant recovery.
// It takes a source ResourceGroup and returns the selected node's ID and the target ResourceGroup with highest priority.
func (rm *ResourceManager) selectNodeForRedundantRecover(sourceRG *ResourceGroup) (int64, *ResourceGroup) {
// computeRGPriority calculates the priority of a ResourceGroup based on certain conditions.
computeRGPriority := func(rg *ResourceGroup) int {
// If the ResourceGroup is missing nodes, boost it's priority by 1,000,000.
if rg.MissingNumOfNodes() > 0 {
return rg.MissingNumOfNodes() * 1000000
}
// If the source ResourceGroup has a 'to' relationship with the current ResourceGroup,
// boost it's priority by 1,000,00.
if sourceRG.HasTo(rg.GetName()) {
return rg.ReachLimitNumOfNodes() * 100000
}
return rg.ReachLimitNumOfNodes()
}
// Second, Transfer node to max reachLimit resource group.
// `TransferTo` configured resource group at high priority.
if selectRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
if rg.HasTo(targetRG.GetName()) {
// give a boost if targetRG is configured as `TransferTo` to set as high priority to select.
return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost
maxPriority := 0
var targetRG *ResourceGroup
candidateNode := int64(-1)
for _, rg := range rm.groups {
if rg.GetName() == sourceRG.GetName() {
continue
}
if rg.ReachLimitNumOfNodes() <= 0 {
continue
}
// Calculate the priority of the current resource group.
priority := computeRGPriority(rg)
if priority > maxPriority {
// select a node from it that is preferred to be removed and assigned to the target resource group.
node := sourceRG.SelectNodeForRG(rg)
// If no such node is found, skip the current resource group.
if node == -1 {
continue
}
return targetRG.ReachLimitNumOfNodes()
},
); selectRG != nil {
return selectRG
candidateNode = node
targetRG = rg
maxPriority = priority
}
}
// Finally, Always transfer node to default resource group.
if rg.GetName() != DefaultResourceGroupName {
return rm.groups[DefaultResourceGroupName]
// Finally, always transfer the node to the default resource group if no other target resource group is found.
if targetRG == nil && sourceRG.GetName() != DefaultResourceGroupName {
targetRG = rm.groups[DefaultResourceGroupName]
if sourceRG != nil {
candidateNode = sourceRG.SelectNodeForRG(targetRG)
}
}
return nil
}
// transferOneNodeFromRGToRG transfer one node from source resource group to target resource group.
func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) {
if sourceRG.NodeNum() == 0 {
return -1, ErrNodeNotEnough
}
// TODO: select node by some load strategy, such as segment loaded.
node := sourceRG.GetNodes()[0]
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
return -1, err
}
return node, nil
return candidateNode, targetRG
}
// assignIncomingNodeWithNodeCheck assign node to resource group with node status check.
@ -715,7 +737,7 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
}
// select a resource group to assign incoming node.
rg = rm.mustSelectAssignIncomingNodeTargetRG()
rg = rm.mustSelectAssignIncomingNodeTargetRG(node)
if err := rm.transferNode(rg.GetName(), node); err != nil {
return "", errors.Wrap(err, "at finally assign to default resource group")
}
@ -723,11 +745,11 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
}
// mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node.
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup {
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(nodeID int64) *ResourceGroup {
// First, Assign it to rg with the most missing nodes at high priority.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.MissingNumOfNodes() > 0
return rg.MissingNumOfNodes() > 0 && rg.AcceptNode(nodeID)
},
func(rg *ResourceGroup) int {
return rg.MissingNumOfNodes()
@ -739,7 +761,7 @@ func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup
// Second, assign it to rg do not reach limit.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.ReachLimitNumOfNodes() > 0
return rg.ReachLimitNumOfNodes() > 0 && rg.AcceptNode(nodeID)
},
func(rg *ResourceGroup) int {
return rg.ReachLimitNumOfNodes()

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
@ -30,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -209,6 +211,7 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": newResourceGroupConfig(0, 0),
})
log.Info("xxxxx")
// RemoveResourceGroup will remove all nodes from the resource group.
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
@ -625,10 +628,11 @@ func (suite *ResourceManagerSuite) TestUnassignFail() {
}
func TestGetResourceGroupsJSON(t *testing.T) {
nodeManager := session.NewNodeManager()
manager := &ResourceManager{groups: make(map[string]*ResourceGroup)}
rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10))
rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10), nodeManager)
rg1.nodes = typeutil.NewUniqueSet(1, 2)
rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20))
rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20), nodeManager)
rg2.nodes = typeutil.NewUniqueSet(3, 4)
manager.groups["rg1"] = rg1
manager.groups["rg2"] = rg2
@ -653,3 +657,337 @@ func TestGetResourceGroupsJSON(t *testing.T) {
checkResult(rg)
}
}
func (suite *ResourceManagerSuite) TestNodeLabels_NodeAssign() {
suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
})
suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
})
suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
})
// test that all query nodes has been marked label1
for i := 1; i <= 30; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test new querynode with label2
for i := 31; i <= 40; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ := suite.manager.GetNodes("rg2")
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
// test new querynode with label3
for i := 41; i <= 50; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label3",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ = suite.manager.GetNodes("rg3")
for _, node := range nodesInRG {
suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
// test swap rg's label
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
},
"rg2": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
},
"rg3": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
},
})
log.Info("test swap rg's label")
for i := 0; i < 4; i++ {
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup("rg3")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ = suite.manager.GetNodes("rg1")
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
nodesInRG, _ = suite.manager.GetNodes("rg2")
for _, node := range nodesInRG {
suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
nodesInRG, _ = suite.manager.GetNodes("rg3")
for _, node := range nodesInRG {
suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
}
func (suite *ResourceManagerSuite) TestNodeLabels_NodeDown() {
suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
})
suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
})
suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
})
// test that all query nodes has been marked label1
for i := 1; i <= 10; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
// test new querynode with label2
for i := 31; i <= 40; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
// test new querynode with label3
for i := 41; i <= 50; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label3",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
// test node down with label1
suite.manager.HandleNodeDown(int64(1))
suite.manager.nodeMgr.Remove(int64(1))
suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
// test node up with label2
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(101),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(101))
suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test node up with label1
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(102),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(102))
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ := suite.manager.GetNodes("rg1")
for _, node := range nodesInRG {
suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup("rg3")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
nodesInRG, _ = suite.manager.GetNodes(DefaultResourceGroupName)
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
}

View File

@ -130,12 +130,8 @@ func (suite *ResourceObserverSuite) TestObserverRecoverOperation() {
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
// but new node with id 10 is not in
suite.nodeMgr.Remove(10)
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
// new node is down, rg3 cannot use that node anymore.
suite.nodeMgr.Remove(10)
suite.meta.ResourceManager.HandleNodeDown(10)
suite.observer.checkAndRecoverResourceGroup()
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))

View File

@ -507,6 +507,7 @@ func (s *Server) startQueryCoord() error {
Address: node.Address,
Hostname: node.HostName,
Version: node.Version,
Labels: node.GetServerLabel(),
}))
s.taskScheduler.AddExecutor(node.ServerID)
@ -745,6 +746,7 @@ func (s *Server) watchNodes(revision int64) {
Address: addr,
Hostname: event.Session.HostName,
Version: event.Session.Version,
Labels: event.Session.GetServerLabel(),
}))
s.nodeUpEventChan <- nodeID
select {

View File

@ -111,6 +111,7 @@ type ImmutableNodeInfo struct {
Address string
Hostname string
Version semver.Version
Labels map[string]string
}
const (
@ -147,6 +148,10 @@ func (n *NodeInfo) Hostname() string {
return n.immutableInfo.Hostname
}
func (n *NodeInfo) Labels() map[string]string {
return n.immutableInfo.Labels
}
func (n *NodeInfo) SegmentCnt() int {
n.mu.RLock()
defer n.mu.RUnlock()

View File

@ -48,7 +48,8 @@ const (
// DefaultServiceRoot default root path used in kv by Session
DefaultServiceRoot = "session/"
// DefaultIDKey default id key for Session
DefaultIDKey = "id"
DefaultIDKey = "id"
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
)
// SessionEventType session event type
@ -100,8 +101,9 @@ type SessionRaw struct {
IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
HostName string `json:"HostName,omitempty"`
EnableDisk bool `json:"EnableDisk,omitempty"`
HostName string `json:"HostName,omitempty"`
EnableDisk bool `json:"EnableDisk,omitempty"`
ServerLabels map[string]string `json:"ServerLabels,omitempty"`
}
func (s *SessionRaw) GetAddress() string {
@ -112,6 +114,10 @@ func (s *SessionRaw) GetServerID() int64 {
return s.ServerID
}
func (s *SessionRaw) GetServerLabel() map[string]string {
return s.ServerLabels
}
func (s *SessionRaw) IsTriggerKill() bool {
return s.TriggerKill
}
@ -286,7 +292,8 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b
panic(err)
}
s.ServerID = serverID
log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID))
s.ServerLabels = GetServerLabelsFromEnv(serverName)
log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID), zap.Any("server_labels", s.ServerLabels))
}
// String makes Session struct able to be logged by zap
@ -329,6 +336,25 @@ func (s *Session) getServerID() (int64, error) {
return nodeID, nil
}
func GetServerLabelsFromEnv(role string) map[string]string {
ret := make(map[string]string)
switch role {
case "querynode":
for _, value := range os.Environ() {
rs := []rune(value)
in := strings.Index(value, "=")
key := string(rs[0:in])
value := string(rs[in+1:])
if strings.HasPrefix(key, SupportedLabelPrefix) {
label := strings.TrimPrefix(key, SupportedLabelPrefix)
ret[label] = value
}
}
}
return ret
}
func (s *Session) checkIDExist() {
s.etcdCli.Txn(s.ctx).If(
clientv3.Compare(

View File

@ -1064,6 +1064,21 @@ func (s *SessionSuite) TestSafeCloseLiveCh() {
})
}
func (s *SessionSuite) TestGetSessions() {
os.Setenv("MILVUS_SERVER_LABEL_key1", "value1")
os.Setenv("MILVUS_SERVER_LABEL_key2", "value2")
os.Setenv("key3", "value3")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key1")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key2")
defer os.Unsetenv("key3")
ret := GetServerLabelsFromEnv("querynode")
assert.Equal(s.T(), 2, len(ret))
assert.Equal(s.T(), "value1", ret["key1"])
assert.Equal(s.T(), "value2", ret["key2"])
}
func TestSessionSuite(t *testing.T) {
suite.Run(t, new(SessionSuite))
}