fix transfer node (#22120)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/22182/head
wei liu 2023-02-14 16:16:34 +08:00 committed by GitHub
parent 33de788ba8
commit 7b4511b8f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 28 deletions

View File

@ -489,7 +489,7 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
return "", ErrNodeNotAssignToRG
}
func (rm *ResourceManager) TransferNode(from, to string) error {
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
@ -497,57 +497,66 @@ func (rm *ResourceManager) TransferNode(from, to string) error {
return ErrRGNotExist
}
if len(rm.groups[from].nodes) == 0 {
return ErrRGIsEmpty
}
rm.checkRGNodeStatus(from)
rm.checkRGNodeStatus(to)
if len(rm.groups[from].nodes) < numNode {
return ErrNodeNotEnough
}
//todo: a better way to choose a node with least balance cost
node := rm.groups[from].GetNodes()[0]
if err := rm.transferNodeInStore(from, to, node); err != nil {
movedNodes, err := rm.transferNodeInStore(from, to, numNode)
if err != nil {
return err
}
err := rm.groups[from].unassignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
for _, node := range movedNodes {
err := rm.groups[from].unassignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
err = rm.groups[to].assignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
err = rm.groups[to].assignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
}
return nil
}
func (rm *ResourceManager) transferNodeInStore(from string, to string, node int64) error {
func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) {
availableNodes := rm.groups[from].GetNodes()
if len(availableNodes) < numNode {
return nil, ErrNodeNotEnough
}
movedNodes := make([]int64, 0, numNode)
fromNodeList := make([]int64, 0)
for nid := range rm.groups[from].nodes {
if nid != node {
fromNodeList = append(fromNodeList, nid)
toNodeList := rm.groups[to].GetNodes()
for i := 0; i < len(availableNodes); i++ {
if i < numNode {
movedNodes = append(movedNodes, availableNodes[i])
toNodeList = append(toNodeList, availableNodes[i])
} else {
fromNodeList = append(fromNodeList, availableNodes[i])
}
}
toNodeList := rm.groups[to].GetNodes()
toNodeList = append(toNodeList, node)
fromRG := &querypb.ResourceGroup{
Name: from,
Capacity: int32(rm.groups[from].GetCapacity()) - 1,
Capacity: int32(rm.groups[from].GetCapacity() - numNode),
Nodes: fromNodeList,
}
toRG := &querypb.ResourceGroup{
Name: to,
Capacity: int32(rm.groups[to].GetCapacity()) + 1,
Capacity: int32(rm.groups[to].GetCapacity() + numNode),
Nodes: toNodeList,
}
return rm.store.SaveResourceGroup(fromRG, toRG)
return movedNodes, rm.store.SaveResourceGroup(fromRG, toRG)
}
// auto recover rg, return recover used node num

View File

@ -115,12 +115,34 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
suite.ErrorIs(err, ErrNodeAlreadyAssign)
// transfer node between rgs
err = suite.manager.TransferNode("rg1", "rg2")
err = suite.manager.TransferNode("rg1", "rg2", 1)
suite.NoError(err)
// transfer meet non exist rg
err = suite.manager.TransferNode("rgggg", "rg2")
err = suite.manager.TransferNode("rgggg", "rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.manager.AssignNode("rg1", 11)
suite.manager.AssignNode("rg1", 12)
suite.manager.AssignNode("rg1", 13)
suite.manager.AssignNode("rg1", 14)
rg1, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
rg2, err := suite.manager.GetResourceGroup("rg2")
suite.NoError(err)
suite.Equal(rg1.GetCapacity(), 4)
suite.Equal(rg2.GetCapacity(), 1)
suite.manager.TransferNode("rg1", "rg2", 3)
suite.Equal(rg1.GetCapacity(), 1)
suite.Equal(rg2.GetCapacity(), 4)
}
func (suite *ResourceManagerSuite) TestHandleNodeUp() {

View File

@ -1009,6 +1009,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
log := log.Ctx(ctx).With(
zap.String("source", req.GetSourceResourceGroup()),
zap.String("target", req.GetTargetResourceGroup()),
zap.Int32("numNode", req.GetNumNode()),
)
log.Info("transfer node between resource group request received")
@ -1027,7 +1028,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup())
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil

View File

@ -473,6 +473,7 @@ func (suite *ServiceSuite) TestTransferNode() {
resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
NumNode: 1,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
@ -498,6 +499,40 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.Contains(resp.Reason, meta.ErrRGNotExist.Error())
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
err = server.meta.ResourceManager.AddResourceGroup("rg3")
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg4")
suite.NoError(err)
suite.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.meta.ResourceManager.AssignNode("rg3", 11)
suite.meta.ResourceManager.AssignNode("rg3", 12)
suite.meta.ResourceManager.AssignNode("rg3", 13)
suite.meta.ResourceManager.AssignNode("rg3", 14)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
TargetResourceGroup: "rg4",
NumNode: 3,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
nodes, err = server.meta.ResourceManager.GetNodes("rg3")
suite.NoError(err)
suite.Len(nodes, 1)
nodes, err = server.meta.ResourceManager.GetNodes("rg4")
suite.NoError(err)
suite.Len(nodes, 3)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
TargetResourceGroup: "rg4",
NumNode: 3,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{