mirror of https://github.com/milvus-io/milvus.git
parent
e27907cd29
commit
34c20581f3
|
@ -648,11 +648,7 @@ func (s *Server) handleNodeUp(node int64) {
|
|||
zap.String("resourceGroup", rgName),
|
||||
)
|
||||
|
||||
rgs := s.meta.ResourceManager.ListResourceGroups()
|
||||
if len(rgs) == 1 {
|
||||
// only __default_resource_group exists
|
||||
utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
|
||||
}
|
||||
utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
|
||||
}
|
||||
|
||||
func (s *Server) handleNodeDown(node int64) {
|
||||
|
|
|
@ -1120,7 +1120,14 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
|
|||
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
|
||||
}
|
||||
|
||||
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
|
||||
replicas := s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
|
||||
if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) &&
|
||||
len(replicas) != int(req.GetNumReplica()) {
|
||||
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||
"transfer replica will cause replica loaded in both default rg and other rg", nil), nil
|
||||
}
|
||||
|
||||
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
|
||||
if len(replicas) > 0 {
|
||||
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
|
||||
|
|
|
@ -600,7 +600,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||
suite.NoError(err)
|
||||
|
||||
resp, err := suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||
SourceResourceGroup: "rg2",
|
||||
TargetResourceGroup: "rg1",
|
||||
CollectionID: 1,
|
||||
NumReplica: 2,
|
||||
|
@ -608,6 +608,15 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||
suite.NoError(err)
|
||||
suite.Contains(resp.Reason, "only found [0] replicas in source resource group")
|
||||
|
||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||
TargetResourceGroup: "rg1",
|
||||
CollectionID: 1,
|
||||
NumReplica: 2,
|
||||
})
|
||||
suite.NoError(err)
|
||||
suite.Contains(resp.Reason, "transfer replica will cause replica loaded in both default rg and other rg")
|
||||
|
||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||
SourceResourceGroup: "rgg",
|
||||
TargetResourceGroup: meta.DefaultResourceGroupName,
|
||||
|
@ -655,26 +664,29 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
|||
suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost"))
|
||||
suite.server.nodeMgr.Add(session.NewNodeInfo(1003, "localhost"))
|
||||
suite.server.nodeMgr.Add(session.NewNodeInfo(1004, "localhost"))
|
||||
suite.server.nodeMgr.Add(session.NewNodeInfo(1005, "localhost"))
|
||||
suite.server.meta.AssignNode("rg1", 1001)
|
||||
suite.server.meta.AssignNode("rg2", 1002)
|
||||
suite.server.meta.AssignNode("rg3", 1003)
|
||||
suite.server.meta.AssignNode("rg3", 1004)
|
||||
suite.server.meta.AssignNode("rg3", 1005)
|
||||
|
||||
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(1))
|
||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||
TargetResourceGroup: "rg3",
|
||||
CollectionID: 1,
|
||||
NumReplica: 2,
|
||||
NumReplica: int64(replicaNum),
|
||||
})
|
||||
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
|
||||
suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 2)
|
||||
suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 3)
|
||||
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
|
||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||
TargetResourceGroup: "rg3",
|
||||
CollectionID: 1,
|
||||
NumReplica: 2,
|
||||
NumReplica: int64(replicaNum),
|
||||
})
|
||||
suite.NoError(err)
|
||||
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
|
||||
|
@ -1595,8 +1607,9 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
|
|||
suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost"))
|
||||
server.handleNodeUp(222)
|
||||
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
|
||||
suite.Len(nodes, 1)
|
||||
suite.Equal(int64(111), nodes[0])
|
||||
suite.Len(nodes, 2)
|
||||
suite.Contains(nodes, int64(111))
|
||||
suite.Contains(nodes, int64(222))
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) loadAll() {
|
||||
|
|
Loading…
Reference in New Issue