fix assign node to replica in nodeUp (#22370)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/22402/head
wei liu 2023-02-23 20:55:47 +08:00 committed by GitHub
parent bdf527880a
commit 7808bb518d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 662 additions and 79 deletions

View File

@ -27,7 +27,14 @@ import (
"go.uber.org/zap"
)
type Controller struct {
type Controller interface {
StartDistInstance(ctx context.Context, nodeID int64)
Remove(nodeID int64)
SyncAll(ctx context.Context)
Stop()
}
type ControllerImpl struct {
mu sync.RWMutex
handlers map[int64]*distHandler
client session.Cluster
@ -37,7 +44,7 @@ type Controller struct {
scheduler task.Scheduler
}
func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) {
func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
dc.mu.Lock()
defer dc.mu.Unlock()
if _, ok := dc.handlers[nodeID]; ok {
@ -48,7 +55,7 @@ func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) {
dc.handlers[nodeID] = h
}
func (dc *Controller) Remove(nodeID int64) {
func (dc *ControllerImpl) Remove(nodeID int64) {
dc.mu.Lock()
defer dc.mu.Unlock()
if h, ok := dc.handlers[nodeID]; ok {
@ -57,7 +64,7 @@ func (dc *Controller) Remove(nodeID int64) {
}
}
func (dc *Controller) SyncAll(ctx context.Context) {
func (dc *ControllerImpl) SyncAll(ctx context.Context) {
dc.mu.RLock()
defer dc.mu.RUnlock()
@ -72,7 +79,7 @@ func (dc *Controller) SyncAll(ctx context.Context) {
wg.Wait()
}
func (dc *Controller) Stop() {
func (dc *ControllerImpl) Stop() {
dc.mu.Lock()
defer dc.mu.Unlock()
for _, h := range dc.handlers {
@ -86,8 +93,8 @@ func NewDistController(
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
scheduler task.Scheduler,
) *Controller {
return &Controller{
) *ControllerImpl {
return &ControllerImpl{
handlers: make(map[int64]*distHandler),
client: client,
nodeManager: nodeManager,

View File

@ -37,7 +37,7 @@ import (
type DistControllerTestSuite struct {
suite.Suite
controller *Controller
controller *ControllerImpl
mockCluster *session.MockCluster
mockScheduler *task.MockScheduler

View File

@ -0,0 +1,149 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
package dist
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockController is an autogenerated mock type for the Controller type
type MockController struct {
mock.Mock
}
type MockController_Expecter struct {
mock *mock.Mock
}
func (_m *MockController) EXPECT() *MockController_Expecter {
return &MockController_Expecter{mock: &_m.Mock}
}
// Remove provides a mock function with given fields: nodeID
func (_m *MockController) Remove(nodeID int64) {
_m.Called(nodeID)
}
// MockController_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove'
type MockController_Remove_Call struct {
*mock.Call
}
// Remove is a helper method to define mock.On call
// - nodeID int64
func (_e *MockController_Expecter) Remove(nodeID interface{}) *MockController_Remove_Call {
return &MockController_Remove_Call{Call: _e.mock.On("Remove", nodeID)}
}
func (_c *MockController_Remove_Call) Run(run func(nodeID int64)) *MockController_Remove_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockController_Remove_Call) Return() *MockController_Remove_Call {
_c.Call.Return()
return _c
}
// StartDistInstance provides a mock function with given fields: ctx, nodeID
func (_m *MockController) StartDistInstance(ctx context.Context, nodeID int64) {
_m.Called(ctx, nodeID)
}
// MockController_StartDistInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartDistInstance'
type MockController_StartDistInstance_Call struct {
*mock.Call
}
// StartDistInstance is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
func (_e *MockController_Expecter) StartDistInstance(ctx interface{}, nodeID interface{}) *MockController_StartDistInstance_Call {
return &MockController_StartDistInstance_Call{Call: _e.mock.On("StartDistInstance", ctx, nodeID)}
}
func (_c *MockController_StartDistInstance_Call) Run(run func(ctx context.Context, nodeID int64)) *MockController_StartDistInstance_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
})
return _c
}
func (_c *MockController_StartDistInstance_Call) Return() *MockController_StartDistInstance_Call {
_c.Call.Return()
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockController) Stop() {
_m.Called()
}
// MockController_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockController_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockController_Expecter) Stop() *MockController_Stop_Call {
return &MockController_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockController_Stop_Call) Run(run func()) *MockController_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockController_Stop_Call) Return() *MockController_Stop_Call {
_c.Call.Return()
return _c
}
// SyncAll provides a mock function with given fields: ctx
func (_m *MockController) SyncAll(ctx context.Context) {
_m.Called(ctx)
}
// MockController_SyncAll_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAll'
type MockController_SyncAll_Call struct {
*mock.Call
}
// SyncAll is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockController_Expecter) SyncAll(ctx interface{}) *MockController_SyncAll_Call {
return &MockController_SyncAll_Call{Call: _e.mock.On("SyncAll", ctx)}
}
func (_c *MockController_SyncAll_Call) Run(run func(ctx context.Context)) *MockController_SyncAll_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockController_SyncAll_Call) Return() *MockController_SyncAll_Call {
_c.Call.Return()
return _c
}
type mockConstructorTestingTNewMockController interface {
mock.TestingT
Cleanup(func())
}
// NewMockController creates a new instance of MockController. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockController(t mockConstructorTestingTNewMockController) *MockController {
mock := &MockController{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -449,6 +449,21 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
}
// assign new node to default rg
newNodes := rm.groups[DefaultResourceGroupName].GetNodes()
newNodes = append(newNodes, node)
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: DefaultResourceGroupName,
Capacity: int32(rm.groups[DefaultResourceGroupName].GetCapacity()),
Nodes: newNodes,
})
if err != nil {
log.Info("failed to add node to resource group",
zap.String("rgName", DefaultResourceGroupName),
zap.Int64("node", node),
zap.Error(err),
)
return "", err
}
rm.groups[DefaultResourceGroupName].assignNode(node, 0)
log.Info("HandleNodeUp: add node to default resource group",
zap.String("rgName", DefaultResourceGroupName),
@ -466,35 +481,56 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
}
rgName, err := rm.findResourceGroupByNode(node)
if err == nil {
if err != nil {
return "", ErrNodeNotAssignToRG
}
newNodes := []int64{}
for _, nid := range rm.groups[rgName].GetNodes() {
if nid != node {
newNodes = append(newNodes, nid)
}
}
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()),
Nodes: newNodes,
})
if err != nil {
log.Info("failed to add node to resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
zap.Error(err),
)
return "", err
}
log.Info("HandleNodeDown: remove node from resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
)
return rgName, rm.groups[rgName].unassignNode(node, 0)
}
return "", ErrNodeNotAssignToRG
}
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error {
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]int64, error) {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[from] == nil || rm.groups[to] == nil {
return ErrRGNotExist
return nil, ErrRGNotExist
}
rm.checkRGNodeStatus(from)
rm.checkRGNodeStatus(to)
if len(rm.groups[from].nodes) < numNode {
return ErrNodeNotEnough
return nil, ErrNodeNotEnough
}
//todo: a better way to choose a node with least balance cost
movedNodes, err := rm.transferNodeInStore(from, to, numNode)
if err != nil {
return err
return nil, err
}
deltaFromCapacity := -1
@ -510,17 +546,17 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) err
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
return nil, err
}
err = rm.groups[to].assignNode(node, deltaToCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
return nil, err
}
}
return nil
return movedNodes, nil
}
func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) {
@ -569,14 +605,16 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i
}
// auto recover rg, return recover used node num
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error) {
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) ([]int64, error) {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[rgName] == nil {
return 0, ErrRGNotExist
return nil, ErrRGNotExist
}
ret := make([]int64, 0)
rm.checkRGNodeStatus(rgName)
lackNodesNum := rm.groups[rgName].LackOfNodes()
nodesInDefault := rm.groups[DefaultResourceGroupName].GetNodes()
@ -586,17 +624,20 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error)
err := rm.unassignNode(DefaultResourceGroupName, node)
if err != nil {
// interrupt transfer, unreachable logic path
return i + 1, err
return ret, err
}
err = rm.groups[rgName].assignNode(node, 0)
if err != nil {
// roll back, unreachable logic path
rm.assignNode(DefaultResourceGroupName, node)
}
return ret, err
}
return lackNodesNum, nil
ret = append(ret, node)
}
return ret, nil
}
func (rm *ResourceManager) Recover() error {
@ -614,17 +655,16 @@ func (rm *ResourceManager) Recover() error {
rm.groups[rg.GetName()].assignNode(node, 0)
}
} else {
rm.groups[rg.GetName()] = NewResourceGroup(0)
rm.groups[rg.GetName()] = NewResourceGroup(int(rg.GetCapacity()))
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node, 1)
rm.groups[rg.GetName()].assignNode(node, 0)
}
}
rm.checkRGNodeStatus(rg.GetName())
log.Info("Recover resource group",
zap.String("rgName", rg.GetName()),
zap.Int64s("nodes", rg.GetNodes()),
zap.Int32("capacity", rg.GetCapacity()),
zap.Int64s("nodes", rm.groups[rg.GetName()].GetNodes()),
zap.Int("capacity", rm.groups[rg.GetName()].GetCapacity()),
)
}

View File

@ -16,6 +16,7 @@
package meta
import (
"errors"
"testing"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -24,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
@ -114,14 +116,14 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
suite.ErrorIs(err, ErrNodeAlreadyAssign)
// transfer node between rgs
err = suite.manager.TransferNode("rg1", "rg2", 1)
_, err = suite.manager.TransferNode("rg1", "rg2", 1)
suite.NoError(err)
// transfer meet non exist rg
err = suite.manager.TransferNode("rgggg", "rg2", 1)
_, err = suite.manager.TransferNode("rgggg", "rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
err = suite.manager.TransferNode("rg1", "rg2", 5)
_, err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
@ -188,26 +190,42 @@ func (suite *ResourceManagerSuite) TestRecover() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
err := suite.manager.AddResourceGroup("rg")
suite.manager.nodeMgr.Add(session.NewNodeInfo(4, "localhost"))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
err = suite.manager.AddResourceGroup("rg2")
suite.NoError(err)
suite.manager.AssignNode("rg", 1)
suite.manager.AssignNode("rg", 2)
suite.manager.AssignNode("rg", 3)
suite.manager.AssignNode("rg1", 1)
suite.manager.AssignNode("rg2", 2)
suite.manager.AssignNode(DefaultResourceGroupName, 3)
suite.manager.AssignNode(DefaultResourceGroupName, 4)
suite.manager.UnassignNode("rg", 3)
suite.manager.HandleNodeDown(2)
suite.manager.HandleNodeDown(3)
// clear resource manager in hack way
delete(suite.manager.groups, "rg")
delete(suite.manager.groups, "rg1")
delete(suite.manager.groups, "rg2")
delete(suite.manager.groups, DefaultResourceGroupName)
suite.manager.Recover()
rg, err := suite.manager.GetResourceGroup("rg")
rg, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(2, rg.GetCapacity())
suite.True(suite.manager.ContainsNode("rg", 1))
suite.True(suite.manager.ContainsNode("rg", 2))
suite.False(suite.manager.ContainsNode("rg", 3))
suite.Equal(1, rg.GetCapacity())
suite.True(suite.manager.ContainsNode("rg1", 1))
print(suite.manager.GetNodes("rg1"))
rg, err = suite.manager.GetResourceGroup("rg2")
suite.NoError(err)
suite.Equal(1, rg.GetCapacity())
suite.False(suite.manager.ContainsNode("rg2", 2))
rg, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(DefaultResourceGroupCapacity, rg.GetCapacity())
suite.False(suite.manager.ContainsNode(DefaultResourceGroupName, 3))
suite.True(suite.manager.ContainsNode(DefaultResourceGroupName, 4))
}
func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
@ -340,6 +358,46 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
suite.Len(defaultRG.GetNodes(), 6)
}
func (suite *ResourceManagerSuite) TestStoreFailed() {
store := NewMockStore(suite.T())
nodeMgr := session.NewNodeManager()
manager := NewResourceManager(store, nodeMgr)
nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
storeErr := errors.New("store error")
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr)
store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr)
err := manager.AddResourceGroup("rg")
suite.ErrorIs(err, storeErr)
manager.groups["rg"] = &ResourceGroup{
nodes: typeutil.NewUniqueSet(),
capacity: 0,
}
err = manager.RemoveResourceGroup("rg")
suite.ErrorIs(err, storeErr)
err = manager.AssignNode("rg", 1)
suite.ErrorIs(err, storeErr)
manager.groups["rg"].assignNode(1, 1)
err = manager.UnassignNode("rg", 1)
suite.ErrorIs(err, storeErr)
_, err = manager.TransferNode("rg", DefaultResourceGroupName, 1)
suite.ErrorIs(err, storeErr)
_, err = manager.HandleNodeUp(2)
suite.ErrorIs(err, storeErr)
_, err = manager.HandleNodeDown(1)
suite.ErrorIs(err, storeErr)
}
func (suite *ResourceManagerSuite) TearDownSuite() {
suite.kv.Close()
}

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"go.uber.org/zap"
)
@ -93,14 +94,16 @@ func (ob *ResourceObserver) checkResourceGroup() {
)
if enableRGAutoRecover {
usedNodeNum, err := manager.AutoRecoverResourceGroup(rgName)
nodes, err := manager.AutoRecoverResourceGroup(rgName)
if err != nil {
log.Warn("failed to recover resource group",
zap.String("rgName", rgName),
zap.Int("lackNodeNum", lackNodeNum-usedNodeNum),
zap.Int("lackNodeNum", lackNodeNum-len(nodes)),
zap.Error(err),
)
}
utils.AddNodesToCollectionsInRG(ob.meta, rgName, nodes...)
}
}
}

View File

@ -17,14 +17,19 @@ package observers
import (
"context"
"errors"
"testing"
"time"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
@ -33,6 +38,7 @@ type ResourceObserverSuite struct {
kv *etcdKV.EtcdKV
//dependency
store *meta.MockStore
meta *meta.Meta
observer *ResourceObserver
nodeMgr *session.NodeManager
@ -61,43 +67,137 @@ func (suite *ResourceObserverSuite) SetupTest() {
suite.kv = etcdKV.NewEtcdKV(cli, config.MetaRootPath)
// meta
store := meta.NewMetaStore(suite.kv)
suite.store = meta.NewMockStore(suite.T())
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.meta = meta.NewMeta(idAllocator, suite.store, suite.nodeMgr)
suite.observer = NewResourceObserver(suite.meta)
suite.observer.Start(context.TODO())
for i := 1; i < 10; i++ {
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
for i := 0; i < 10; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i))
}
}
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{100, 101},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(100, 101),
))
// hack all node down from replica
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)
suite.meta.ResourceManager.AssignNode("rg", 103)
suite.meta.ResourceManager.HandleNodeDown(100)
suite.meta.ResourceManager.HandleNodeDown(101)
//before auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
return lackNodesNum == 2
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 2 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
// after auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
return lackNodesNum == 0
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 0 && len(nodesInReplica) == 2
}, 5*time.Second, 1*time.Second)
}
func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
suite.meta.ResourceManager.AddResourceGroup("rg")
for i := 100; i < 200; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
suite.meta.ResourceManager.AssignNode("rg", int64(i))
suite.meta.ResourceManager.HandleNodeDown(int64(i))
}
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
return lackNodesNum == 90
}, 5*time.Second, 1*time.Second)
}
func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(2)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{100, 101},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(100, 101),
))
// hack all node down from replica
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)
suite.meta.ResourceManager.AssignNode("rg", 103)
suite.meta.ResourceManager.HandleNodeDown(100)
suite.meta.ResourceManager.HandleNodeDown(101)
//before auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 2 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
// after auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 0 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
}
func (suite *ResourceObserverSuite) TearDownSuite() {

View File

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
@ -95,7 +94,7 @@ type Server struct {
taskScheduler task.Scheduler
// HeartBeat
distController *dist.Controller
distController dist.Controller
// Checkers
checkerController *checkers.CheckerController
@ -650,29 +649,10 @@ func (s *Server) handleNodeUp(node int64) {
zap.String("resourceGroup", rgName),
)
for _, collection := range s.meta.CollectionManager.GetAll() {
log := log.With(zap.Int64("collectionID", collection))
replica := s.meta.ReplicaManager.GetByCollectionAndNode(collection, node)
if replica == nil {
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(collection, rgName)
if len(replicas) == 0 {
continue
}
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].Len() < replicas[j].Len()
})
replica := replicas[0]
// TODO(yah01): this may fail, need a component to check whether a node is assigned
err = s.meta.ReplicaManager.AddNode(replica.GetID(), node)
if err != nil {
log.Warn("failed to assign node to replicas",
zap.Int64("replicaID", replica.GetID()),
zap.Error(err),
)
}
log.Info("assign node to replica",
zap.Int64("replicaID", replica.GetID()))
}
rgs := s.meta.ResourceManager.ListResourceGroups()
if len(rgs) == 1 {
// only __default_resource_group exists
utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
}
}

View File

@ -1051,12 +1051,30 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
}
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup())
replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup())
loadSameCollection := false
for _, r1 := range replicasInSource {
for _, r2 := range replicasInTarget {
if r1.GetCollectionID() == r2.GetCollectionID() {
loadSameCollection = true
}
}
}
if loadSameCollection {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("can't transfer node, cause the resource group[%s] and the resource group[%s] loaded same collection",
req.GetSourceResourceGroup(), req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil
}
utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...)
return successStatus, nil
}

View File

@ -27,10 +27,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
@ -74,6 +76,7 @@ type ServiceSuite struct {
balancer balance.Balance
distMgr *meta.DistributionManager
distController *dist.MockController
// Test object
server *Server
@ -156,6 +159,7 @@ func (suite *ServiceSuite) SetupTest() {
)
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
suite.distMgr = meta.NewDistributionManager()
suite.distController = dist.NewMockController(suite.T())
suite.server = &Server{
kv: suite.kv,
@ -172,6 +176,8 @@ func (suite *ServiceSuite) SetupTest() {
jobScheduler: suite.jobScheduler,
taskScheduler: suite.taskScheduler,
balancer: suite.balancer,
distController: suite.distController,
ctx: context.Background(),
}
suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist,
@ -469,6 +475,16 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg2")
suite.NoError(err)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg1",
},
typeutil.NewUniqueSet(),
))
// test transfer node
resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
@ -480,6 +496,26 @@ func (suite *ServiceSuite) TestTransferNode() {
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
suite.NoError(err)
suite.Len(nodes, 1)
nodesInReplica := server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodesInReplica, 1)
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg2",
},
typeutil.NewUniqueSet(),
))
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg1",
TargetResourceGroup: "rg2",
NumNode: 1,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, "can't transfer node")
// test transfer node meet non-exist source rg
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
@ -1531,6 +1567,38 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
}
}
func (suite *ServiceSuite) TestHandleNodeUp() {
server := suite.server
suite.server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
suite.server.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: meta.DefaultResourceGroupName,
},
typeutil.NewUniqueSet(),
))
suite.taskScheduler.EXPECT().AddExecutor(mock.Anything)
suite.distController.EXPECT().StartDistInstance(mock.Anything, mock.Anything)
suite.nodeMgr.Add(session.NewNodeInfo(111, "localhost"))
server.handleNodeUp(111)
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 1)
suite.Equal(int64(111), nodes[0])
log.Info("handleNodeUp")
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
suite.server.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost"))
server.handleNodeUp(222)
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 1)
suite.Equal(int64(111), nodes[0])
}
func (suite *ServiceSuite) loadAll() {
ctx := context.Background()
for _, collection := range suite.collections {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -146,6 +147,39 @@ func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replic
return nil
}
// add nodes to all collections in rgName
// for each collection, add node to replica with least number of nodes
func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) {
for _, node := range nodes {
for _, collection := range m.CollectionManager.GetAll() {
log := log.With(zap.Int64("collectionID", collection))
replica := m.ReplicaManager.GetByCollectionAndNode(collection, node)
if replica == nil {
replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName)
if len(replicas) == 0 {
continue
}
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].Len() < replicas[j].Len()
})
replica := replicas[0]
// TODO(yah01): this may fail, need a component to check whether a node is assigned
err := m.ReplicaManager.AddNode(replica.GetID(), node)
if err != nil {
log.Warn("failed to assign node to replicas",
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeId", node),
zap.Error(err),
)
continue
}
log.Info("assign node to replica",
zap.Int64("replicaID", replica.GetID()))
}
}
}
}
// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them
func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) {
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName)

View File

@ -17,14 +17,18 @@
package utils
import (
"errors"
"testing"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestSpawnReplicasWithRG(t *testing.T) {
@ -110,3 +114,125 @@ func TestSpawnReplicasWithRG(t *testing.T) {
})
}
}
func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
Params.Init()
store := meta.NewMockStore(t)
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 3,
CollectionID: 2,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 4,
CollectionID: 2,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
storeErr := errors.New("store error")
store.EXPECT().SaveReplica(mock.Anything).Return(storeErr)
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 0)
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 0)
assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 0)
assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 0)
}
func TestAddNodesToCollectionsInRG(t *testing.T) {
Params.Init()
store := meta.NewMockStore(t)
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 3,
CollectionID: 2,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
m.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
ID: 4,
CollectionID: 2,
Nodes: []int64{},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(),
))
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2)
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 2)
assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 2)
assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 2)
}