mirror of https://github.com/milvus-io/milvus.git
enhance: make configure load param feature be compatible with old sdk (#35520)
issue: #31570 #35521 --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/35600/head
parent
6e29d713b3
commit
22ced010cd
|
@ -362,7 +362,7 @@ func (suite *JobSuite) TestLoadCollection() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err := job.Wait()
|
||||
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
|
||||
// Load with 3 replica on 3 rg
|
||||
req = &querypb.LoadCollectionRequest{
|
||||
|
@ -384,7 +384,7 @@ func (suite *JobSuite) TestLoadCollection() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err = job.Wait()
|
||||
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
}
|
||||
|
||||
func (suite *JobSuite) TestLoadCollectionWithReplicas() {
|
||||
|
@ -414,7 +414,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err := job.Wait()
|
||||
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -596,7 +596,7 @@ func (suite *JobSuite) TestLoadPartition() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err := job.Wait()
|
||||
suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
|
||||
// test load 3 replica in 3 rg, should pass rg check
|
||||
req = &querypb.LoadPartitionsRequest{
|
||||
|
@ -619,7 +619,7 @@ func (suite *JobSuite) TestLoadPartition() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err = job.Wait()
|
||||
suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
}
|
||||
|
||||
func (suite *JobSuite) TestDynamicLoad() {
|
||||
|
@ -766,7 +766,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err := job.Wait()
|
||||
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1107,7 +1107,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
|
|||
)
|
||||
suite.scheduler.Add(job)
|
||||
err := job.Wait()
|
||||
suite.ErrorIs(err, meta.ErrNodeNotEnough)
|
||||
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -216,7 +216,9 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
|
|||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
|
||||
// to be compatible with old sdk, which set replica=1 if replica is not specified
|
||||
// so only both replica and resource groups didn't set in request, it will turn to use the configured load info
|
||||
if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 {
|
||||
// when replica number or resource groups is not set, use pre-defined load config
|
||||
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
|
||||
if err != nil {
|
||||
|
@ -333,7 +335,9 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
|
|||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
|
||||
// to be compatible with old sdk, which set replica=1 if replica is not specified
|
||||
// so only both replica and resource groups didn't set in request, it will turn to use the configured load info
|
||||
if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 {
|
||||
// when replica number or resource groups is not set, use database level config
|
||||
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
|
||||
if err != nil {
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
@ -27,13 +29,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrGetNodesFromRG = errors.New("failed to get node from rg")
|
||||
ErrNoReplicaFound = errors.New("no replica found during assign nodes")
|
||||
ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes")
|
||||
ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number")
|
||||
)
|
||||
|
||||
func GetPartitions(collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) {
|
||||
collection := collectionMgr.GetCollection(collectionID)
|
||||
if collection != nil {
|
||||
|
@ -117,7 +112,8 @@ func RecoverAllCollection(m *meta.Meta) {
|
|||
|
||||
func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) {
|
||||
if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) {
|
||||
return nil, ErrUseWrongNumRG
|
||||
return nil, errors.Errorf(
|
||||
"replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ","))
|
||||
}
|
||||
|
||||
replicaNumInRG := make(map[string]int)
|
||||
|
@ -140,15 +136,16 @@ func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int
|
|||
// 3. replica1 spawn finished, but cannot find related resource group.
|
||||
for rgName, num := range replicaNumInRG {
|
||||
if !m.ContainResourceGroup(rgName) {
|
||||
return nil, ErrGetNodesFromRG
|
||||
return nil, merr.WrapErrResourceGroupNotFound(rgName)
|
||||
}
|
||||
nodes, err := m.ResourceManager.GetNodes(rgName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if num > len(nodes) {
|
||||
log.Warn("node not enough", zap.Error(meta.ErrNodeNotEnough), zap.Int("replicaNum", num), zap.Int("nodeNum", len(nodes)), zap.String("rgName", rgName))
|
||||
return nil, meta.ErrNodeNotEnough
|
||||
err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num)
|
||||
log.Warn("failed to check resource group", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return replicaNumInRG, nil
|
||||
|
|
|
@ -710,7 +710,7 @@ class TestTransferNode(TestcaseBase):
|
|||
|
||||
# load with different replicas
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
|
||||
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
|
||||
collection_w.load(replica_number=replicas,
|
||||
_resource_groups=[rgA_name, rgB_name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
@ -877,7 +877,7 @@ class TestTransferNode(TestcaseBase):
|
|||
|
||||
# load with different replicas
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
|
||||
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
|
||||
partition_w.load(replica_number=replicas,
|
||||
_resource_groups=[rgA_name, rgB_name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
@ -1210,7 +1210,7 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
# load 3 replicas in rgA and rgB
|
||||
replica_number = 3
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
|
||||
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
|
||||
collection_w.load(replica_number=replica_number,
|
||||
_resource_groups=[rgA_name, rgB_name],
|
||||
check_task=CheckTasks.err_res,
|
||||
|
|
Loading…
Reference in New Issue