mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/22331/head
parent
a0e1cc4da8
commit
795dfb05bb
2
go.mod
2
go.mod
|
@ -160,7 +160,7 @@ require (
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
|
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
|
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
|
||||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0 // indirect
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0 // indirect
|
||||||
go.opentelemetry.io/otel v1.0.1
|
go.opentelemetry.io/otel v1.0.1 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.0.1 // indirect
|
go.opentelemetry.io/otel/sdk v1.0.1 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.0.1 // indirect
|
go.opentelemetry.io/otel/trace v1.0.1 // indirect
|
||||||
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
|
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
|
||||||
|
|
|
@ -149,7 +149,8 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nodes) == len(stoppingNodesSegments) {
|
if len(nodes) == len(stoppingNodesSegments) {
|
||||||
return b.handleStoppingNodes(replica, stoppingNodesSegments)
|
// no available nodes to balance
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
average := totalCnt / len(nodesSegments)
|
average := totalCnt / len(nodesSegments)
|
||||||
|
@ -234,35 +235,6 @@ outer:
|
||||||
return plans, b.getChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))
|
return plans, b.getChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *RowCountBasedBalancer) handleStoppingNodes(replica *meta.Replica, nodeSegments map[int64][]*meta.Segment) ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
|
||||||
segmentPlans := make([]SegmentAssignPlan, 0, len(nodeSegments))
|
|
||||||
channelPlans := make([]ChannelAssignPlan, 0, len(nodeSegments))
|
|
||||||
for nodeID, segments := range nodeSegments {
|
|
||||||
for _, segment := range segments {
|
|
||||||
segmentPlan := SegmentAssignPlan{
|
|
||||||
ReplicaID: replica.ID,
|
|
||||||
From: nodeID,
|
|
||||||
To: -1,
|
|
||||||
Segment: segment,
|
|
||||||
Weight: GetWeight(1),
|
|
||||||
}
|
|
||||||
segmentPlans = append(segmentPlans, segmentPlan)
|
|
||||||
}
|
|
||||||
for _, dmChannel := range b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) {
|
|
||||||
channelPlan := ChannelAssignPlan{
|
|
||||||
ReplicaID: replica.ID,
|
|
||||||
From: nodeID,
|
|
||||||
To: -1,
|
|
||||||
Channel: dmChannel,
|
|
||||||
Weight: GetWeight(1),
|
|
||||||
}
|
|
||||||
channelPlans = append(channelPlans, channelPlan)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return segmentPlans, channelPlans
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *RowCountBasedBalancer) collectionStoppingSegments(stoppingNodesSegments map[int64][]*meta.Segment) ([]*meta.Segment, int) {
|
func (b *RowCountBasedBalancer) collectionStoppingSegments(stoppingNodesSegments map[int64][]*meta.Segment) ([]*meta.Segment, int) {
|
||||||
var (
|
var (
|
||||||
segments []*meta.Segment
|
segments []*meta.Segment
|
||||||
|
|
|
@ -172,11 +172,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
||||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectPlans: []SegmentAssignPlan{
|
expectPlans: []SegmentAssignPlan{},
|
||||||
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh},
|
|
||||||
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh},
|
|
||||||
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}, From: 1, To: -1, ReplicaID: 1, Weight: weightHigh},
|
|
||||||
},
|
|
||||||
expectChannelPlans: []ChannelAssignPlan{},
|
expectChannelPlans: []ChannelAssignPlan{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -53,6 +53,7 @@ var (
|
||||||
ErrListResourceGroupsFailed = errors.New("failed to list resource group")
|
ErrListResourceGroupsFailed = errors.New("failed to list resource group")
|
||||||
ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
|
ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
|
||||||
ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
|
ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
|
||||||
|
ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together")
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
||||||
|
@ -360,6 +361,10 @@ func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string)
|
||||||
if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) {
|
if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) {
|
||||||
return ErrLoadUseWrongRG
|
return ErrLoadUseWrongRG
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(resourceGroups) > 1 && rgName == meta.DefaultResourceGroupName {
|
||||||
|
return ErrLoadWithDefaultRG
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1004,6 +1009,12 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
|
||||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), ErrNotHealthy), nil
|
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), ErrNotHealthy), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
replicas := s.meta.ReplicaManager.GetByResourceGroup(req.GetResourceGroup())
|
||||||
|
if len(replicas) > 0 {
|
||||||
|
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||||
|
fmt.Sprintf("some replicas still loaded in resource group[%s], release it first", req.GetResourceGroup()), meta.ErrDeleteNonEmptyRG), nil
|
||||||
|
}
|
||||||
|
|
||||||
err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup())
|
err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err))
|
log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err))
|
||||||
|
@ -1035,6 +1046,11 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
|
||||||
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
|
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.GetNumNode() <= 0 {
|
||||||
|
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||||
|
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
|
||||||
|
}
|
||||||
|
|
||||||
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
|
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
|
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
|
||||||
|
@ -1071,7 +1087,12 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
|
||||||
if len(replicas) > 0 {
|
if len(replicas) > 0 {
|
||||||
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||||
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
|
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
|
||||||
len(replicas), req.GetSourceResourceGroup())), nil
|
len(replicas), req.GetTargetResourceGroup())), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.GetNumReplica() <= 0 {
|
||||||
|
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||||
|
fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// for now, we don't support to transfer replica of same collection to same resource group
|
// for now, we don't support to transfer replica of same collection to same resource group
|
||||||
|
|
|
@ -533,11 +533,20 @@ func (suite *ServiceSuite) TestTransferNode() {
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||||
|
|
||||||
|
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||||
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
TargetResourceGroup: "rg1",
|
||||||
|
NumNode: -1,
|
||||||
|
})
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
||||||
|
|
||||||
// server unhealthy
|
// server unhealthy
|
||||||
server.status.Store(commonpb.StateCode_Abnormal)
|
server.status.Store(commonpb.StateCode_Abnormal)
|
||||||
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
TargetResourceGroup: "rg1",
|
TargetResourceGroup: "rg1",
|
||||||
|
NumNode: 3,
|
||||||
})
|
})
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||||
|
@ -581,6 +590,15 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
|
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
|
||||||
|
|
||||||
|
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||||
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
TargetResourceGroup: "rg1",
|
||||||
|
CollectionID: 1,
|
||||||
|
NumReplica: 0,
|
||||||
|
})
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
|
||||||
|
|
||||||
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
|
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
|
||||||
CollectionID: 1,
|
CollectionID: 1,
|
||||||
ID: 111,
|
ID: 111,
|
||||||
|
@ -655,6 +673,15 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
|
||||||
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
|
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req := &querypb.LoadCollectionRequest{
|
||||||
|
CollectionID: 0,
|
||||||
|
ReplicaNumber: 2,
|
||||||
|
ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"},
|
||||||
|
}
|
||||||
|
resp, err := server.LoadCollection(ctx, req)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
||||||
|
|
||||||
// Test load with partitions loaded
|
// Test load with partitions loaded
|
||||||
for _, collection := range suite.collections {
|
for _, collection := range suite.collections {
|
||||||
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
|
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
|
||||||
|
@ -713,13 +740,22 @@ func (suite *ServiceSuite) TestLoadPartition() {
|
||||||
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
|
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req := &querypb.LoadPartitionsRequest{
|
||||||
|
CollectionID: suite.collections[0],
|
||||||
|
PartitionIDs: suite.partitions[suite.collections[0]],
|
||||||
|
ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"},
|
||||||
|
}
|
||||||
|
resp, err := server.LoadPartitions(ctx, req)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
||||||
|
|
||||||
// Test when server is not healthy
|
// Test when server is not healthy
|
||||||
server.UpdateStateCode(commonpb.StateCode_Initializing)
|
server.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||||
req := &querypb.LoadPartitionsRequest{
|
req = &querypb.LoadPartitionsRequest{
|
||||||
CollectionID: suite.collections[0],
|
CollectionID: suite.collections[0],
|
||||||
PartitionIDs: suite.partitions[suite.collections[0]],
|
PartitionIDs: suite.partitions[suite.collections[0]],
|
||||||
}
|
}
|
||||||
resp, err := server.LoadPartitions(ctx, req)
|
resp, err = server.LoadPartitions(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Contains(resp.Reason, ErrNotHealthy.Error())
|
suite.Contains(resp.Reason, ErrNotHealthy.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1323,6 +1323,57 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
||||||
|
|
||||||
collection_w.get_replicas()
|
collection_w.get_replicas()
|
||||||
|
|
||||||
|
@pytest.mark.tags(CaseLabel.L0)
|
||||||
|
def test_transfer_nodes_back(self):
|
||||||
|
self._connect()
|
||||||
|
|
||||||
|
rgA_name = "rgA"
|
||||||
|
self.init_resource_group(name=rgA_name)
|
||||||
|
|
||||||
|
self.utility_wrap.transfer_node(source=ct.default_resource_group_name,
|
||||||
|
target=rgA_name,
|
||||||
|
num_node=2)
|
||||||
|
dim = 128
|
||||||
|
collection_w = self.init_collection_wrap(shards_num=1)
|
||||||
|
insert_ids = []
|
||||||
|
nb = 500
|
||||||
|
for i in range(5):
|
||||||
|
res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb))
|
||||||
|
collection_w.flush()
|
||||||
|
insert_ids.extend(res.primary_keys)
|
||||||
|
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
|
||||||
|
|
||||||
|
collection_w.load(replica_number=2, _resource_groups=[rgA_name, ct.default_resource_group_name])
|
||||||
|
|
||||||
|
nq = 5
|
||||||
|
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||||
|
# verify search succ
|
||||||
|
collection_w.search(vectors[:nq],
|
||||||
|
ct.default_float_vec_field_name,
|
||||||
|
ct.default_search_params,
|
||||||
|
ct.default_limit,
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": nq,
|
||||||
|
"ids": insert_ids.copy(),
|
||||||
|
"limit": ct.default_limit}
|
||||||
|
)
|
||||||
|
self.utility_wrap.transfer_node(source=rgA_name,
|
||||||
|
target=ct.default_resource_group_name,
|
||||||
|
num_node=2)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
collection_w.search(vectors[:nq],
|
||||||
|
ct.default_float_vec_field_name,
|
||||||
|
ct.default_search_params,
|
||||||
|
ct.default_limit,
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": nq,
|
||||||
|
"ids": insert_ids.copy(),
|
||||||
|
"limit": ct.default_limit}
|
||||||
|
)
|
||||||
|
|
||||||
|
collection_w.get_replicas()
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L0)
|
@pytest.mark.tags(CaseLabel.L0)
|
||||||
def test_transfer_replica_not_enough_replicas_to_transfer(self):
|
def test_transfer_replica_not_enough_replicas_to_transfer(self):
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue