From a9a263d5a897b8801630bf9a0f5f0b745099c733 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 23 Feb 2023 14:15:45 +0800 Subject: [PATCH] fix assign node to replica in nodeUp (#22323) Signed-off-by: Wei Liu --- Makefile | 1 + go.mod | 3 +- go.sum | 2 - internal/querycoordv2/dist/dist_controller.go | 21 ++- .../querycoordv2/dist/dist_controller_test.go | 2 +- internal/querycoordv2/dist/mock_controller.go | 149 ++++++++++++++++++ .../querycoordv2/meta/resource_manager.go | 86 +++++++--- .../meta/resource_manager_test.go | 86 ++++++++-- .../observers/resource_observer.go | 7 +- .../observers/resource_observer_test.go | 112 ++++++++++++- internal/querycoordv2/server.go | 31 +--- internal/querycoordv2/services.go | 20 ++- internal/querycoordv2/services_test.go | 70 +++++++- internal/querycoordv2/utils/meta.go | 34 ++++ internal/querycoordv2/utils/meta_test.go | 127 +++++++++++++++ 15 files changed, 668 insertions(+), 83 deletions(-) create mode 100644 internal/querycoordv2/dist/mock_controller.go diff --git a/Makefile b/Makefile index 67a1cce28b..34d04590c5 100644 --- a/Makefile +++ b/Makefile @@ -327,6 +327,7 @@ generate-mockery: getdeps $(PWD)/bin/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage $(PWD)/bin/mockery --name=Store --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_store.go --with-expecter --structname=MockStore --outpkg=meta --inpackage $(PWD)/bin/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage + $(PWD)/bin/mockery --name=Controller --dir=$(PWD)/internal/querycoordv2/dist --output=$(PWD)/internal/querycoordv2/dist --filename=mock_controller.go --with-expecter --structname=MockController --outpkg=dist --inpackage # internal/querynode $(PWD)/bin/mockery --name=TSafeReplicaInterface --dir=$(PWD)/internal/querynode --output=$(PWD)/internal/querynode --filename=mock_tsafe_replica_test.go --with-expecter --structname=MockTSafeReplicaInterface --outpkg=querynode --inpackage # internal/rootcoord diff --git a/go.mod b/go.mod index fb789e1001..af33de4492 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/antonmedv/expr v1.8.9 github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20 github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 - github.com/apache/thrift v0.15.0 // indirect + github.com/apache/thrift v0.15.0 github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b github.com/bits-and-blooms/bloom/v3 v3.0.1 github.com/casbin/casbin/v2 v2.44.2 @@ -182,6 +182,7 @@ require ( require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect require ( + github.com/golang/mock v1.5.0 github.com/uber/jaeger-client-go v2.30.0+incompatible go.opentelemetry.io/otel/exporters/jaeger v1.11.2 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2 diff --git a/go.sum b/go.sum index a2fa0ce8f9..b4b2f007f6 100644 --- a/go.sum +++ b/go.sum @@ -491,8 +491,6 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20230209081028-aabbca7f95ae h1:4PPf72uc+pUFIT22yUHKrMMVyiJu8Q5l8FrQ4IkvAAY= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20230209081028-aabbca7f95ae/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7 h1:G3qXTVWaHXQkgnPOxuCb/NIxb2oD4xHHViyuTGs5KQU= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index f319de0fde..4bfe836dd2 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -27,7 +27,14 @@ import ( "go.uber.org/zap" ) -type Controller struct { +type Controller interface { + StartDistInstance(ctx context.Context, nodeID int64) + Remove(nodeID int64) + SyncAll(ctx context.Context) + Stop() +} + +type ControllerImpl struct { mu sync.RWMutex handlers map[int64]*distHandler client session.Cluster @@ -37,7 +44,7 @@ type Controller struct { scheduler task.Scheduler } -func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) { +func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) { dc.mu.Lock() defer dc.mu.Unlock() if _, ok := dc.handlers[nodeID]; ok { @@ -48,7 +55,7 @@ func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) { dc.handlers[nodeID] = h } -func (dc *Controller) Remove(nodeID int64) { +func (dc *ControllerImpl) Remove(nodeID int64) { dc.mu.Lock() defer dc.mu.Unlock() if h, ok := dc.handlers[nodeID]; ok { @@ -57,7 +64,7 @@ func (dc *Controller) Remove(nodeID int64) { } } -func (dc *Controller) SyncAll(ctx context.Context) { +func (dc *ControllerImpl) SyncAll(ctx context.Context) { dc.mu.RLock() defer dc.mu.RUnlock() @@ -72,7 +79,7 @@ func (dc *Controller) SyncAll(ctx context.Context) { wg.Wait() } -func (dc *Controller) Stop() { +func (dc *ControllerImpl) Stop() { dc.mu.Lock() defer dc.mu.Unlock() for _, h := range dc.handlers { @@ -86,8 +93,8 @@ func NewDistController( dist *meta.DistributionManager, targetMgr *meta.TargetManager, scheduler task.Scheduler, -) *Controller { - return &Controller{ +) *ControllerImpl { + return &ControllerImpl{ handlers: make(map[int64]*distHandler), client: client, nodeManager: nodeManager, diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index c08afb9018..0f0ff913d6 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -37,7 +37,7 @@ import ( type DistControllerTestSuite struct { suite.Suite - controller *Controller + controller *ControllerImpl mockCluster *session.MockCluster mockScheduler *task.MockScheduler diff --git a/internal/querycoordv2/dist/mock_controller.go b/internal/querycoordv2/dist/mock_controller.go new file mode 100644 index 0000000000..981b01f9cc --- /dev/null +++ b/internal/querycoordv2/dist/mock_controller.go @@ -0,0 +1,149 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package dist + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockController is an autogenerated mock type for the Controller type +type MockController struct { + mock.Mock +} + +type MockController_Expecter struct { + mock *mock.Mock +} + +func (_m *MockController) EXPECT() *MockController_Expecter { + return &MockController_Expecter{mock: &_m.Mock} +} + +// Remove provides a mock function with given fields: nodeID +func (_m *MockController) Remove(nodeID int64) { + _m.Called(nodeID) +} + +// MockController_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove' +type MockController_Remove_Call struct { + *mock.Call +} + +// Remove is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockController_Expecter) Remove(nodeID interface{}) *MockController_Remove_Call { + return &MockController_Remove_Call{Call: _e.mock.On("Remove", nodeID)} +} + +func (_c *MockController_Remove_Call) Run(run func(nodeID int64)) *MockController_Remove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockController_Remove_Call) Return() *MockController_Remove_Call { + _c.Call.Return() + return _c +} + +// StartDistInstance provides a mock function with given fields: ctx, nodeID +func (_m *MockController) StartDistInstance(ctx context.Context, nodeID int64) { + _m.Called(ctx, nodeID) +} + +// MockController_StartDistInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartDistInstance' +type MockController_StartDistInstance_Call struct { + *mock.Call +} + +// StartDistInstance is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +func (_e *MockController_Expecter) StartDistInstance(ctx interface{}, nodeID interface{}) *MockController_StartDistInstance_Call { + return &MockController_StartDistInstance_Call{Call: _e.mock.On("StartDistInstance", ctx, nodeID)} +} + +func (_c *MockController_StartDistInstance_Call) Run(run func(ctx context.Context, nodeID int64)) *MockController_StartDistInstance_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockController_StartDistInstance_Call) Return() *MockController_StartDistInstance_Call { + _c.Call.Return() + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockController) Stop() { + _m.Called() +} + +// MockController_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockController_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockController_Expecter) Stop() *MockController_Stop_Call { + return &MockController_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockController_Stop_Call) Run(run func()) *MockController_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockController_Stop_Call) Return() *MockController_Stop_Call { + _c.Call.Return() + return _c +} + +// SyncAll provides a mock function with given fields: ctx +func (_m *MockController) SyncAll(ctx context.Context) { + _m.Called(ctx) +} + +// MockController_SyncAll_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAll' +type MockController_SyncAll_Call struct { + *mock.Call +} + +// SyncAll is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockController_Expecter) SyncAll(ctx interface{}) *MockController_SyncAll_Call { + return &MockController_SyncAll_Call{Call: _e.mock.On("SyncAll", ctx)} +} + +func (_c *MockController_SyncAll_Call) Run(run func(ctx context.Context)) *MockController_SyncAll_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockController_SyncAll_Call) Return() *MockController_SyncAll_Call { + _c.Call.Return() + return _c +} + +type mockConstructorTestingTNewMockController interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockController creates a new instance of MockController. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockController(t mockConstructorTestingTNewMockController) *MockController { + mock := &MockController{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index f69d6557f0..6262e35147 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -449,6 +449,21 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) { } // assign new node to default rg + newNodes := rm.groups[DefaultResourceGroupName].GetNodes() + newNodes = append(newNodes, node) + err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{ + Name: DefaultResourceGroupName, + Capacity: int32(rm.groups[DefaultResourceGroupName].GetCapacity()), + Nodes: newNodes, + }) + if err != nil { + log.Info("failed to add node to resource group", + zap.String("rgName", DefaultResourceGroupName), + zap.Int64("node", node), + zap.Error(err), + ) + return "", err + } rm.groups[DefaultResourceGroupName].assignNode(node, 0) log.Info("HandleNodeUp: add node to default resource group", zap.String("rgName", DefaultResourceGroupName), @@ -466,35 +481,56 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) { } rgName, err := rm.findResourceGroupByNode(node) - if err == nil { - log.Info("HandleNodeDown: remove node from resource group", - zap.String("rgName", rgName), - zap.Int64("node", node), - ) - return rgName, rm.groups[rgName].unassignNode(node, 0) + if err != nil { + return "", ErrNodeNotAssignToRG + } - return "", ErrNodeNotAssignToRG + newNodes := []int64{} + for _, nid := range rm.groups[rgName].GetNodes() { + if nid != node { + newNodes = append(newNodes, nid) + } + } + err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{ + Name: rgName, + Capacity: int32(rm.groups[rgName].GetCapacity()), + Nodes: newNodes, + }) + if err != nil { + log.Info("failed to add node to resource group", + zap.String("rgName", rgName), + zap.Int64("node", node), + zap.Error(err), + ) + return "", err + } + + log.Info("HandleNodeDown: remove node from resource group", + zap.String("rgName", rgName), + zap.Int64("node", node), + ) + return rgName, rm.groups[rgName].unassignNode(node, 0) } -func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error { +func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]int64, error) { rm.rwmutex.Lock() defer rm.rwmutex.Unlock() if rm.groups[from] == nil || rm.groups[to] == nil { - return ErrRGNotExist + return nil, ErrRGNotExist } rm.checkRGNodeStatus(from) rm.checkRGNodeStatus(to) if len(rm.groups[from].nodes) < numNode { - return ErrNodeNotEnough + return nil, ErrNodeNotEnough } //todo: a better way to choose a node with least balance cost movedNodes, err := rm.transferNodeInStore(from, to, numNode) if err != nil { - return err + return nil, err } deltaFromCapacity := -1 @@ -510,17 +546,17 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) err err := rm.groups[from].unassignNode(node, deltaFromCapacity) if err != nil { // interrupt transfer, unreachable logic path - return err + return nil, err } err = rm.groups[to].assignNode(node, deltaToCapacity) if err != nil { // interrupt transfer, unreachable logic path - return err + return nil, err } } - return nil + return movedNodes, nil } func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) { @@ -569,14 +605,16 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i } // auto recover rg, return recover used node num -func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error) { +func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) ([]int64, error) { rm.rwmutex.Lock() defer rm.rwmutex.Unlock() if rm.groups[rgName] == nil { - return 0, ErrRGNotExist + return nil, ErrRGNotExist } + ret := make([]int64, 0) + rm.checkRGNodeStatus(rgName) lackNodesNum := rm.groups[rgName].LackOfNodes() nodesInDefault := rm.groups[DefaultResourceGroupName].GetNodes() @@ -586,17 +624,20 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error) err := rm.unassignNode(DefaultResourceGroupName, node) if err != nil { // interrupt transfer, unreachable logic path - return i + 1, err + return ret, err } err = rm.groups[rgName].assignNode(node, 0) if err != nil { // roll back, unreachable logic path rm.assignNode(DefaultResourceGroupName, node) + return ret, err } + + ret = append(ret, node) } - return lackNodesNum, nil + return ret, nil } func (rm *ResourceManager) Recover() error { @@ -614,17 +655,16 @@ func (rm *ResourceManager) Recover() error { rm.groups[rg.GetName()].assignNode(node, 0) } } else { - rm.groups[rg.GetName()] = NewResourceGroup(0) + rm.groups[rg.GetName()] = NewResourceGroup(int(rg.GetCapacity())) for _, node := range rg.GetNodes() { - rm.groups[rg.GetName()].assignNode(node, 1) + rm.groups[rg.GetName()].assignNode(node, 0) } } - rm.checkRGNodeStatus(rg.GetName()) log.Info("Recover resource group", zap.String("rgName", rg.GetName()), - zap.Int64s("nodes", rg.GetNodes()), - zap.Int32("capacity", rg.GetCapacity()), + zap.Int64s("nodes", rm.groups[rg.GetName()].GetNodes()), + zap.Int("capacity", rm.groups[rg.GetName()].GetCapacity()), ) } diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 152e12cdd7..d41708a92c 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -16,6 +16,7 @@ package meta import ( + "errors" "testing" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -24,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -114,14 +116,14 @@ func (suite *ResourceManagerSuite) TestManipulateNode() { suite.ErrorIs(err, ErrNodeAlreadyAssign) // transfer node between rgs - err = suite.manager.TransferNode("rg1", "rg2", 1) + _, err = suite.manager.TransferNode("rg1", "rg2", 1) suite.NoError(err) // transfer meet non exist rg - err = suite.manager.TransferNode("rgggg", "rg2", 1) + _, err = suite.manager.TransferNode("rgggg", "rg2", 1) suite.ErrorIs(err, ErrRGNotExist) - err = suite.manager.TransferNode("rg1", "rg2", 5) + _, err = suite.manager.TransferNode("rg1", "rg2", 5) suite.ErrorIs(err, ErrNodeNotEnough) suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost")) @@ -188,26 +190,42 @@ func (suite *ResourceManagerSuite) TestRecover() { suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) - err := suite.manager.AddResourceGroup("rg") + suite.manager.nodeMgr.Add(session.NewNodeInfo(4, "localhost")) + err := suite.manager.AddResourceGroup("rg1") + suite.NoError(err) + err = suite.manager.AddResourceGroup("rg2") suite.NoError(err) - suite.manager.AssignNode("rg", 1) - suite.manager.AssignNode("rg", 2) - suite.manager.AssignNode("rg", 3) + suite.manager.AssignNode("rg1", 1) + suite.manager.AssignNode("rg2", 2) + suite.manager.AssignNode(DefaultResourceGroupName, 3) + suite.manager.AssignNode(DefaultResourceGroupName, 4) - suite.manager.UnassignNode("rg", 3) + suite.manager.HandleNodeDown(2) + suite.manager.HandleNodeDown(3) // clear resource manager in hack way - delete(suite.manager.groups, "rg") + delete(suite.manager.groups, "rg1") + delete(suite.manager.groups, "rg2") delete(suite.manager.groups, DefaultResourceGroupName) suite.manager.Recover() - rg, err := suite.manager.GetResourceGroup("rg") + rg, err := suite.manager.GetResourceGroup("rg1") suite.NoError(err) - suite.Equal(2, rg.GetCapacity()) - suite.True(suite.manager.ContainsNode("rg", 1)) - suite.True(suite.manager.ContainsNode("rg", 2)) - suite.False(suite.manager.ContainsNode("rg", 3)) + suite.Equal(1, rg.GetCapacity()) + suite.True(suite.manager.ContainsNode("rg1", 1)) + print(suite.manager.GetNodes("rg1")) + + rg, err = suite.manager.GetResourceGroup("rg2") + suite.NoError(err) + suite.Equal(1, rg.GetCapacity()) + suite.False(suite.manager.ContainsNode("rg2", 2)) + + rg, err = suite.manager.GetResourceGroup(DefaultResourceGroupName) + suite.NoError(err) + suite.Equal(DefaultResourceGroupCapacity, rg.GetCapacity()) + suite.False(suite.manager.ContainsNode(DefaultResourceGroupName, 3)) + suite.True(suite.manager.ContainsNode(DefaultResourceGroupName, 4)) } func (suite *ResourceManagerSuite) TestCheckOutboundNodes() { @@ -340,6 +358,46 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() { suite.Len(defaultRG.GetNodes(), 6) } +func (suite *ResourceManagerSuite) TestStoreFailed() { + store := NewMockStore(suite.T()) + nodeMgr := session.NewNodeManager() + manager := NewResourceManager(store, nodeMgr) + + nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + storeErr := errors.New("store error") + store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr) + store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr) + + err := manager.AddResourceGroup("rg") + suite.ErrorIs(err, storeErr) + + manager.groups["rg"] = &ResourceGroup{ + nodes: typeutil.NewUniqueSet(), + capacity: 0, + } + + err = manager.RemoveResourceGroup("rg") + suite.ErrorIs(err, storeErr) + + err = manager.AssignNode("rg", 1) + suite.ErrorIs(err, storeErr) + + manager.groups["rg"].assignNode(1, 1) + err = manager.UnassignNode("rg", 1) + suite.ErrorIs(err, storeErr) + + _, err = manager.TransferNode("rg", DefaultResourceGroupName, 1) + suite.ErrorIs(err, storeErr) + + _, err = manager.HandleNodeUp(2) + suite.ErrorIs(err, storeErr) + + _, err = manager.HandleNodeDown(1) + suite.ErrorIs(err, storeErr) +} + func (suite *ResourceManagerSuite) TearDownSuite() { suite.kv.Close() } diff --git a/internal/querycoordv2/observers/resource_observer.go b/internal/querycoordv2/observers/resource_observer.go index 7c78241291..7b3a775852 100644 --- a/internal/querycoordv2/observers/resource_observer.go +++ b/internal/querycoordv2/observers/resource_observer.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "go.uber.org/zap" ) @@ -93,14 +94,16 @@ func (ob *ResourceObserver) checkResourceGroup() { ) if enableRGAutoRecover { - usedNodeNum, err := manager.AutoRecoverResourceGroup(rgName) + nodes, err := manager.AutoRecoverResourceGroup(rgName) if err != nil { log.Warn("failed to recover resource group", zap.String("rgName", rgName), - zap.Int("lackNodeNum", lackNodeNum-usedNodeNum), + zap.Int("lackNodeNum", lackNodeNum-len(nodes)), zap.Error(err), ) } + + utils.AddNodesToCollectionsInRG(ob.meta, rgName, nodes...) } } } diff --git a/internal/querycoordv2/observers/resource_observer_test.go b/internal/querycoordv2/observers/resource_observer_test.go index a1d7c37559..5ea90159fd 100644 --- a/internal/querycoordv2/observers/resource_observer_test.go +++ b/internal/querycoordv2/observers/resource_observer_test.go @@ -6,7 +6,7 @@ // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -17,15 +17,20 @@ package observers import ( "context" + "errors" "testing" "time" etcdKV "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -34,6 +39,7 @@ type ResourceObserverSuite struct { kv *etcdKV.EtcdKV //dependency + store *meta.MockStore meta *meta.Meta observer *ResourceObserver nodeMgr *session.NodeManager @@ -62,43 +68,137 @@ func (suite *ResourceObserverSuite) SetupTest() { suite.kv = etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue()) // meta - store := meta.NewMetaStore(suite.kv) + suite.store = meta.NewMockStore(suite.T()) idAllocator := RandomIncrementIDAllocator() suite.nodeMgr = session.NewNodeManager() - suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + suite.meta = meta.NewMeta(idAllocator, suite.store, suite.nodeMgr) suite.observer = NewResourceObserver(suite.meta) suite.observer.Start(context.TODO()) - for i := 1; i < 10; i++ { + suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) + for i := 0; i < 10; i++ { suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i)) } } func (suite *ResourceObserverSuite) TestCheckNodesInReplica() { + suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil) + suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil) + suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{100, 101}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(100, 101), + )) + + // hack all node down from replica + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) suite.meta.ResourceManager.AddResourceGroup("rg") suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost")) suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost")) suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost")) suite.meta.ResourceManager.AssignNode("rg", 100) suite.meta.ResourceManager.AssignNode("rg", 101) suite.meta.ResourceManager.AssignNode("rg", 102) + suite.meta.ResourceManager.AssignNode("rg", 103) suite.meta.ResourceManager.HandleNodeDown(100) suite.meta.ResourceManager.HandleNodeDown(101) //before auto recover rg suite.Eventually(func() bool { lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg") - return lackNodesNum == 2 + nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes() + return lackNodesNum == 2 && len(nodesInReplica) == 0 }, 5*time.Second, 1*time.Second) // after auto recover rg suite.Eventually(func() bool { lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg") - return lackNodesNum == 0 + nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes() + return lackNodesNum == 0 && len(nodesInReplica) == 2 + }, 5*time.Second, 1*time.Second) +} + +func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() { + suite.meta.ResourceManager.AddResourceGroup("rg") + for i := 100; i < 200; i++ { + suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) + suite.meta.ResourceManager.AssignNode("rg", int64(i)) + suite.meta.ResourceManager.HandleNodeDown(int64(i)) + } + + suite.Eventually(func() bool { + lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg") + return lackNodesNum == 90 + }, 5*time.Second, 1*time.Second) +} + +func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() { + suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil) + suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(2) + suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{100, 101}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(100, 101), + )) + + // hack all node down from replica + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error")) + suite.meta.ResourceManager.AddResourceGroup("rg") + suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost")) + suite.meta.ResourceManager.AssignNode("rg", 100) + suite.meta.ResourceManager.AssignNode("rg", 101) + suite.meta.ResourceManager.AssignNode("rg", 102) + suite.meta.ResourceManager.AssignNode("rg", 103) + suite.meta.ResourceManager.HandleNodeDown(100) + suite.meta.ResourceManager.HandleNodeDown(101) + + //before auto recover rg + suite.Eventually(func() bool { + lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg") + nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes() + return lackNodesNum == 2 && len(nodesInReplica) == 0 }, 5*time.Second, 1*time.Second) + // after auto recover rg + suite.Eventually(func() bool { + lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg") + nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes() + return lackNodesNum == 0 && len(nodesInReplica) == 0 + }, 5*time.Second, 1*time.Second) } func (suite *ResourceObserverSuite) TearDownSuite() { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 383cefdb82..114d86bd10 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "os" - "sort" "sync" "sync/atomic" "syscall" @@ -46,6 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -94,7 +94,7 @@ type Server struct { taskScheduler task.Scheduler // HeartBeat - distController *dist.Controller + distController dist.Controller // Checkers checkerController *checkers.CheckerController @@ -648,29 +648,10 @@ func (s *Server) handleNodeUp(node int64) { zap.String("resourceGroup", rgName), ) - for _, collection := range s.meta.CollectionManager.GetAll() { - log := log.With(zap.Int64("collectionID", collection)) - replica := s.meta.ReplicaManager.GetByCollectionAndNode(collection, node) - if replica == nil { - replicas := s.meta.ReplicaManager.GetByCollectionAndRG(collection, rgName) - if len(replicas) == 0 { - continue - } - sort.Slice(replicas, func(i, j int) bool { - return replicas[i].Len() < replicas[j].Len() - }) - replica := replicas[0] - // TODO(yah01): this may fail, need a component to check whether a node is assigned - err = s.meta.ReplicaManager.AddNode(replica.GetID(), node) - if err != nil { - log.Warn("failed to assign node to replicas", - zap.Int64("replicaID", replica.GetID()), - zap.Error(err), - ) - } - log.Info("assign node to replica", - zap.Int64("replicaID", replica.GetID())) - } + rgs := s.meta.ResourceManager.ListResourceGroups() + if len(rgs) == 1 { + // only __default_resource_group exists + utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node) } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 6a60f7a840..d5bee029fd 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -1044,12 +1044,30 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil } - err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) + replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup()) + replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup()) + loadSameCollection := false + for _, r1 := range replicasInSource { + for _, r2 := range replicasInTarget { + if r1.GetCollectionID() == r2.GetCollectionID() { + loadSameCollection = true + } + } + } + if loadSameCollection { + return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, + fmt.Sprintf("can't transfer node, cause the resource group[%s] and the resource group[%s] loaded same collection", + req.GetSourceResourceGroup(), req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil + } + + nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) if err != nil { log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err)) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil } + utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...) + return successStatus, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 4b29ff45fc..bd666e8af9 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -27,10 +27,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/balance" + "github.com/milvus-io/milvus/internal/querycoordv2/dist" "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" @@ -73,7 +75,8 @@ type ServiceSuite struct { taskScheduler *task.MockScheduler balancer balance.Balance - distMgr *meta.DistributionManager + distMgr *meta.DistributionManager + distController *dist.MockController // Test object server *Server @@ -156,6 +159,7 @@ func (suite *ServiceSuite) SetupTest() { ) meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() suite.distMgr = meta.NewDistributionManager() + suite.distController = dist.NewMockController(suite.T()) suite.server = &Server{ kv: suite.kv, @@ -172,6 +176,8 @@ func (suite *ServiceSuite) SetupTest() { jobScheduler: suite.jobScheduler, taskScheduler: suite.taskScheduler, balancer: suite.balancer, + distController: suite.distController, + ctx: context.Background(), } suite.server.collectionObserver = observers.NewCollectionObserver( suite.server.dist, @@ -469,6 +475,16 @@ func (suite *ServiceSuite) TestTransferNode() { suite.NoError(err) err = server.meta.ResourceManager.AddResourceGroup("rg2") suite.NoError(err) + suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg1", + }, + typeutil.NewUniqueSet(), + )) // test transfer node resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, @@ -480,6 +496,26 @@ func (suite *ServiceSuite) TestTransferNode() { nodes, err := server.meta.ResourceManager.GetNodes("rg1") suite.NoError(err) suite.Len(nodes, 1) + nodesInReplica := server.meta.ReplicaManager.Get(1).GetNodes() + suite.Len(nodesInReplica, 1) + + suite.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg2", + }, + typeutil.NewUniqueSet(), + )) + resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ + SourceResourceGroup: "rg1", + TargetResourceGroup: "rg2", + NumNode: 1, + }) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) + suite.Contains(resp.Reason, "can't transfer node") // test transfer node meet non-exist source rg resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ @@ -1540,6 +1576,38 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { } } +func (suite *ServiceSuite) TestHandleNodeUp() { + server := suite.server + suite.server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + suite.server.meta.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: meta.DefaultResourceGroupName, + }, + typeutil.NewUniqueSet(), + )) + + suite.taskScheduler.EXPECT().AddExecutor(mock.Anything) + suite.distController.EXPECT().StartDistInstance(mock.Anything, mock.Anything) + + suite.nodeMgr.Add(session.NewNodeInfo(111, "localhost")) + server.handleNodeUp(111) + nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes() + suite.Len(nodes, 1) + suite.Equal(int64(111), nodes[0]) + log.Info("handleNodeUp") + + // when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp + suite.server.meta.ResourceManager.AddResourceGroup("rg") + suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost")) + server.handleNodeUp(222) + nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes() + suite.Len(nodes, 1) + suite.Equal(int64(111), nodes[0]) +} + func (suite *ServiceSuite) loadAll() { ctx := context.Background() for _, collection := range suite.collections { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 7445d2944e..f5d9528910 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "sort" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -146,6 +147,39 @@ func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replic return nil } +// add nodes to all collections in rgName +// for each collection, add node to replica with least number of nodes +func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) { + for _, node := range nodes { + for _, collection := range m.CollectionManager.GetAll() { + log := log.With(zap.Int64("collectionID", collection)) + replica := m.ReplicaManager.GetByCollectionAndNode(collection, node) + if replica == nil { + replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName) + if len(replicas) == 0 { + continue + } + sort.Slice(replicas, func(i, j int) bool { + return replicas[i].Len() < replicas[j].Len() + }) + replica := replicas[0] + // TODO(yah01): this may fail, need a component to check whether a node is assigned + err := m.ReplicaManager.AddNode(replica.GetID(), node) + if err != nil { + log.Warn("failed to assign node to replicas", + zap.Int64("replicaID", replica.GetID()), + zap.Int64("nodeId", node), + zap.Error(err), + ) + continue + } + log.Info("assign node to replica", + zap.Int64("replicaID", replica.GetID())) + } + } + } +} + // SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) { replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName) diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index 502ea4ce7e..5ba176a1a6 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -17,13 +17,18 @@ package utils import ( + "errors" "testing" etcdKV "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestSpawnReplicasWithRG(t *testing.T) { @@ -108,3 +113,125 @@ func TestSpawnReplicasWithRG(t *testing.T) { }) } } + +func TestAddNodesToCollectionsInRGFailed(t *testing.T) { + Params.Init() + + store := meta.NewMockStore(t) + store.EXPECT().SaveCollection(mock.Anything).Return(nil) + store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4) + store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) + nodeMgr := session.NewNodeManager() + m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr) + m.ResourceManager.AddResourceGroup("rg") + m.CollectionManager.PutCollection(CreateTestCollection(1, 2)) + m.CollectionManager.PutCollection(CreateTestCollection(2, 2)) + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 3, + CollectionID: 2, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 4, + CollectionID: 2, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + storeErr := errors.New("store error") + store.EXPECT().SaveReplica(mock.Anything).Return(storeErr) + AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...) + + assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 0) + assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 0) + assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 0) + assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 0) +} + +func TestAddNodesToCollectionsInRG(t *testing.T) { + Params.Init() + + store := meta.NewMockStore(t) + store.EXPECT().SaveCollection(mock.Anything).Return(nil) + store.EXPECT().SaveReplica(mock.Anything).Return(nil) + store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) + nodeMgr := session.NewNodeManager() + m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr) + m.ResourceManager.AddResourceGroup("rg") + m.CollectionManager.PutCollection(CreateTestCollection(1, 2)) + m.CollectionManager.PutCollection(CreateTestCollection(2, 2)) + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 3, + CollectionID: 2, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + m.ReplicaManager.Put(meta.NewReplica( + &querypb.Replica{ + ID: 4, + CollectionID: 2, + Nodes: []int64{}, + ResourceGroup: "rg", + }, + typeutil.NewUniqueSet(), + )) + + AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...) + + assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2) + assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 2) + assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 2) + assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 2) +}