mirror of https://github.com/milvus-io/milvus.git
fix assign node to replica in nodeUp (#22323)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/22244/head
parent
0851e05014
commit
a9a263d5a8
1
Makefile
1
Makefile
|
@ -327,6 +327,7 @@ generate-mockery: getdeps
|
||||||
$(PWD)/bin/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage
|
$(PWD)/bin/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage
|
||||||
$(PWD)/bin/mockery --name=Store --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_store.go --with-expecter --structname=MockStore --outpkg=meta --inpackage
|
$(PWD)/bin/mockery --name=Store --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_store.go --with-expecter --structname=MockStore --outpkg=meta --inpackage
|
||||||
$(PWD)/bin/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage
|
$(PWD)/bin/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage
|
||||||
|
$(PWD)/bin/mockery --name=Controller --dir=$(PWD)/internal/querycoordv2/dist --output=$(PWD)/internal/querycoordv2/dist --filename=mock_controller.go --with-expecter --structname=MockController --outpkg=dist --inpackage
|
||||||
# internal/querynode
|
# internal/querynode
|
||||||
$(PWD)/bin/mockery --name=TSafeReplicaInterface --dir=$(PWD)/internal/querynode --output=$(PWD)/internal/querynode --filename=mock_tsafe_replica_test.go --with-expecter --structname=MockTSafeReplicaInterface --outpkg=querynode --inpackage
|
$(PWD)/bin/mockery --name=TSafeReplicaInterface --dir=$(PWD)/internal/querynode --output=$(PWD)/internal/querynode --filename=mock_tsafe_replica_test.go --with-expecter --structname=MockTSafeReplicaInterface --outpkg=querynode --inpackage
|
||||||
# internal/rootcoord
|
# internal/rootcoord
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -10,7 +10,7 @@ require (
|
||||||
github.com/antonmedv/expr v1.8.9
|
github.com/antonmedv/expr v1.8.9
|
||||||
github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20
|
github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20
|
||||||
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
|
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
|
||||||
github.com/apache/thrift v0.15.0 // indirect
|
github.com/apache/thrift v0.15.0
|
||||||
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b
|
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b
|
||||||
github.com/bits-and-blooms/bloom/v3 v3.0.1
|
github.com/bits-and-blooms/bloom/v3 v3.0.1
|
||||||
github.com/casbin/casbin/v2 v2.44.2
|
github.com/casbin/casbin/v2 v2.44.2
|
||||||
|
@ -182,6 +182,7 @@ require (
|
||||||
require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
|
require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/golang/mock v1.5.0
|
||||||
github.com/uber/jaeger-client-go v2.30.0+incompatible
|
github.com/uber/jaeger-client-go v2.30.0+incompatible
|
||||||
go.opentelemetry.io/otel/exporters/jaeger v1.11.2
|
go.opentelemetry.io/otel/exporters/jaeger v1.11.2
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -491,8 +491,6 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex
|
||||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||||
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230209081028-aabbca7f95ae h1:4PPf72uc+pUFIT22yUHKrMMVyiJu8Q5l8FrQ4IkvAAY=
|
|
||||||
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230209081028-aabbca7f95ae/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
|
|
||||||
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7 h1:G3qXTVWaHXQkgnPOxuCb/NIxb2oD4xHHViyuTGs5KQU=
|
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7 h1:G3qXTVWaHXQkgnPOxuCb/NIxb2oD4xHHViyuTGs5KQU=
|
||||||
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
|
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-3cf200738ae7/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
|
||||||
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
||||||
|
|
|
@ -27,7 +27,14 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Controller struct {
|
type Controller interface {
|
||||||
|
StartDistInstance(ctx context.Context, nodeID int64)
|
||||||
|
Remove(nodeID int64)
|
||||||
|
SyncAll(ctx context.Context)
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type ControllerImpl struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
handlers map[int64]*distHandler
|
handlers map[int64]*distHandler
|
||||||
client session.Cluster
|
client session.Cluster
|
||||||
|
@ -37,7 +44,7 @@ type Controller struct {
|
||||||
scheduler task.Scheduler
|
scheduler task.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) {
|
func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
|
||||||
dc.mu.Lock()
|
dc.mu.Lock()
|
||||||
defer dc.mu.Unlock()
|
defer dc.mu.Unlock()
|
||||||
if _, ok := dc.handlers[nodeID]; ok {
|
if _, ok := dc.handlers[nodeID]; ok {
|
||||||
|
@ -48,7 +55,7 @@ func (dc *Controller) StartDistInstance(ctx context.Context, nodeID int64) {
|
||||||
dc.handlers[nodeID] = h
|
dc.handlers[nodeID] = h
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *Controller) Remove(nodeID int64) {
|
func (dc *ControllerImpl) Remove(nodeID int64) {
|
||||||
dc.mu.Lock()
|
dc.mu.Lock()
|
||||||
defer dc.mu.Unlock()
|
defer dc.mu.Unlock()
|
||||||
if h, ok := dc.handlers[nodeID]; ok {
|
if h, ok := dc.handlers[nodeID]; ok {
|
||||||
|
@ -57,7 +64,7 @@ func (dc *Controller) Remove(nodeID int64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *Controller) SyncAll(ctx context.Context) {
|
func (dc *ControllerImpl) SyncAll(ctx context.Context) {
|
||||||
dc.mu.RLock()
|
dc.mu.RLock()
|
||||||
defer dc.mu.RUnlock()
|
defer dc.mu.RUnlock()
|
||||||
|
|
||||||
|
@ -72,7 +79,7 @@ func (dc *Controller) SyncAll(ctx context.Context) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *Controller) Stop() {
|
func (dc *ControllerImpl) Stop() {
|
||||||
dc.mu.Lock()
|
dc.mu.Lock()
|
||||||
defer dc.mu.Unlock()
|
defer dc.mu.Unlock()
|
||||||
for _, h := range dc.handlers {
|
for _, h := range dc.handlers {
|
||||||
|
@ -86,8 +93,8 @@ func NewDistController(
|
||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr *meta.TargetManager,
|
||||||
scheduler task.Scheduler,
|
scheduler task.Scheduler,
|
||||||
) *Controller {
|
) *ControllerImpl {
|
||||||
return &Controller{
|
return &ControllerImpl{
|
||||||
handlers: make(map[int64]*distHandler),
|
handlers: make(map[int64]*distHandler),
|
||||||
client: client,
|
client: client,
|
||||||
nodeManager: nodeManager,
|
nodeManager: nodeManager,
|
||||||
|
|
|
@ -37,7 +37,7 @@ import (
|
||||||
|
|
||||||
type DistControllerTestSuite struct {
|
type DistControllerTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
controller *Controller
|
controller *ControllerImpl
|
||||||
mockCluster *session.MockCluster
|
mockCluster *session.MockCluster
|
||||||
mockScheduler *task.MockScheduler
|
mockScheduler *task.MockScheduler
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
// Code generated by mockery v2.16.0. DO NOT EDIT.
|
||||||
|
|
||||||
|
package dist
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
|
||||||
|
mock "github.com/stretchr/testify/mock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockController is an autogenerated mock type for the Controller type
|
||||||
|
type MockController struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
type MockController_Expecter struct {
|
||||||
|
mock *mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_m *MockController) EXPECT() *MockController_Expecter {
|
||||||
|
return &MockController_Expecter{mock: &_m.Mock}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove provides a mock function with given fields: nodeID
|
||||||
|
func (_m *MockController) Remove(nodeID int64) {
|
||||||
|
_m.Called(nodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockController_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove'
|
||||||
|
type MockController_Remove_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove is a helper method to define mock.On call
|
||||||
|
// - nodeID int64
|
||||||
|
func (_e *MockController_Expecter) Remove(nodeID interface{}) *MockController_Remove_Call {
|
||||||
|
return &MockController_Remove_Call{Call: _e.mock.On("Remove", nodeID)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_Remove_Call) Run(run func(nodeID int64)) *MockController_Remove_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(int64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_Remove_Call) Return() *MockController_Remove_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartDistInstance provides a mock function with given fields: ctx, nodeID
|
||||||
|
func (_m *MockController) StartDistInstance(ctx context.Context, nodeID int64) {
|
||||||
|
_m.Called(ctx, nodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockController_StartDistInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartDistInstance'
|
||||||
|
type MockController_StartDistInstance_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartDistInstance is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
// - nodeID int64
|
||||||
|
func (_e *MockController_Expecter) StartDistInstance(ctx interface{}, nodeID interface{}) *MockController_StartDistInstance_Call {
|
||||||
|
return &MockController_StartDistInstance_Call{Call: _e.mock.On("StartDistInstance", ctx, nodeID)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_StartDistInstance_Call) Run(run func(ctx context.Context, nodeID int64)) *MockController_StartDistInstance_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(int64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_StartDistInstance_Call) Return() *MockController_StartDistInstance_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop provides a mock function with given fields:
|
||||||
|
func (_m *MockController) Stop() {
|
||||||
|
_m.Called()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockController_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
|
||||||
|
type MockController_Stop_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is a helper method to define mock.On call
|
||||||
|
func (_e *MockController_Expecter) Stop() *MockController_Stop_Call {
|
||||||
|
return &MockController_Stop_Call{Call: _e.mock.On("Stop")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_Stop_Call) Run(run func()) *MockController_Stop_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run()
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_Stop_Call) Return() *MockController_Stop_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncAll provides a mock function with given fields: ctx
|
||||||
|
func (_m *MockController) SyncAll(ctx context.Context) {
|
||||||
|
_m.Called(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockController_SyncAll_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAll'
|
||||||
|
type MockController_SyncAll_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncAll is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
func (_e *MockController_Expecter) SyncAll(ctx interface{}) *MockController_SyncAll_Call {
|
||||||
|
return &MockController_SyncAll_Call{Call: _e.mock.On("SyncAll", ctx)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_SyncAll_Call) Run(run func(ctx context.Context)) *MockController_SyncAll_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockController_SyncAll_Call) Return() *MockController_SyncAll_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockConstructorTestingTNewMockController interface {
|
||||||
|
mock.TestingT
|
||||||
|
Cleanup(func())
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockController creates a new instance of MockController. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||||
|
func NewMockController(t mockConstructorTestingTNewMockController) *MockController {
|
||||||
|
mock := &MockController{}
|
||||||
|
mock.Mock.Test(t)
|
||||||
|
|
||||||
|
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||||
|
|
||||||
|
return mock
|
||||||
|
}
|
|
@ -449,6 +449,21 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// assign new node to default rg
|
// assign new node to default rg
|
||||||
|
newNodes := rm.groups[DefaultResourceGroupName].GetNodes()
|
||||||
|
newNodes = append(newNodes, node)
|
||||||
|
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
|
||||||
|
Name: DefaultResourceGroupName,
|
||||||
|
Capacity: int32(rm.groups[DefaultResourceGroupName].GetCapacity()),
|
||||||
|
Nodes: newNodes,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Info("failed to add node to resource group",
|
||||||
|
zap.String("rgName", DefaultResourceGroupName),
|
||||||
|
zap.Int64("node", node),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
rm.groups[DefaultResourceGroupName].assignNode(node, 0)
|
rm.groups[DefaultResourceGroupName].assignNode(node, 0)
|
||||||
log.Info("HandleNodeUp: add node to default resource group",
|
log.Info("HandleNodeUp: add node to default resource group",
|
||||||
zap.String("rgName", DefaultResourceGroupName),
|
zap.String("rgName", DefaultResourceGroupName),
|
||||||
|
@ -466,35 +481,56 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rgName, err := rm.findResourceGroupByNode(node)
|
rgName, err := rm.findResourceGroupByNode(node)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
log.Info("HandleNodeDown: remove node from resource group",
|
return "", ErrNodeNotAssignToRG
|
||||||
zap.String("rgName", rgName),
|
|
||||||
zap.Int64("node", node),
|
|
||||||
)
|
|
||||||
return rgName, rm.groups[rgName].unassignNode(node, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", ErrNodeNotAssignToRG
|
newNodes := []int64{}
|
||||||
|
for _, nid := range rm.groups[rgName].GetNodes() {
|
||||||
|
if nid != node {
|
||||||
|
newNodes = append(newNodes, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
|
||||||
|
Name: rgName,
|
||||||
|
Capacity: int32(rm.groups[rgName].GetCapacity()),
|
||||||
|
Nodes: newNodes,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Info("failed to add node to resource group",
|
||||||
|
zap.String("rgName", rgName),
|
||||||
|
zap.Int64("node", node),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("HandleNodeDown: remove node from resource group",
|
||||||
|
zap.String("rgName", rgName),
|
||||||
|
zap.Int64("node", node),
|
||||||
|
)
|
||||||
|
return rgName, rm.groups[rgName].unassignNode(node, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error {
|
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]int64, error) {
|
||||||
rm.rwmutex.Lock()
|
rm.rwmutex.Lock()
|
||||||
defer rm.rwmutex.Unlock()
|
defer rm.rwmutex.Unlock()
|
||||||
|
|
||||||
if rm.groups[from] == nil || rm.groups[to] == nil {
|
if rm.groups[from] == nil || rm.groups[to] == nil {
|
||||||
return ErrRGNotExist
|
return nil, ErrRGNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
rm.checkRGNodeStatus(from)
|
rm.checkRGNodeStatus(from)
|
||||||
rm.checkRGNodeStatus(to)
|
rm.checkRGNodeStatus(to)
|
||||||
if len(rm.groups[from].nodes) < numNode {
|
if len(rm.groups[from].nodes) < numNode {
|
||||||
return ErrNodeNotEnough
|
return nil, ErrNodeNotEnough
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo: a better way to choose a node with least balance cost
|
//todo: a better way to choose a node with least balance cost
|
||||||
movedNodes, err := rm.transferNodeInStore(from, to, numNode)
|
movedNodes, err := rm.transferNodeInStore(from, to, numNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
deltaFromCapacity := -1
|
deltaFromCapacity := -1
|
||||||
|
@ -510,17 +546,17 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) err
|
||||||
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
|
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// interrupt transfer, unreachable logic path
|
// interrupt transfer, unreachable logic path
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = rm.groups[to].assignNode(node, deltaToCapacity)
|
err = rm.groups[to].assignNode(node, deltaToCapacity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// interrupt transfer, unreachable logic path
|
// interrupt transfer, unreachable logic path
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return movedNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) {
|
func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) {
|
||||||
|
@ -569,14 +605,16 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i
|
||||||
}
|
}
|
||||||
|
|
||||||
// auto recover rg, return recover used node num
|
// auto recover rg, return recover used node num
|
||||||
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error) {
|
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) ([]int64, error) {
|
||||||
rm.rwmutex.Lock()
|
rm.rwmutex.Lock()
|
||||||
defer rm.rwmutex.Unlock()
|
defer rm.rwmutex.Unlock()
|
||||||
|
|
||||||
if rm.groups[rgName] == nil {
|
if rm.groups[rgName] == nil {
|
||||||
return 0, ErrRGNotExist
|
return nil, ErrRGNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret := make([]int64, 0)
|
||||||
|
|
||||||
rm.checkRGNodeStatus(rgName)
|
rm.checkRGNodeStatus(rgName)
|
||||||
lackNodesNum := rm.groups[rgName].LackOfNodes()
|
lackNodesNum := rm.groups[rgName].LackOfNodes()
|
||||||
nodesInDefault := rm.groups[DefaultResourceGroupName].GetNodes()
|
nodesInDefault := rm.groups[DefaultResourceGroupName].GetNodes()
|
||||||
|
@ -586,17 +624,20 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error)
|
||||||
err := rm.unassignNode(DefaultResourceGroupName, node)
|
err := rm.unassignNode(DefaultResourceGroupName, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// interrupt transfer, unreachable logic path
|
// interrupt transfer, unreachable logic path
|
||||||
return i + 1, err
|
return ret, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = rm.groups[rgName].assignNode(node, 0)
|
err = rm.groups[rgName].assignNode(node, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// roll back, unreachable logic path
|
// roll back, unreachable logic path
|
||||||
rm.assignNode(DefaultResourceGroupName, node)
|
rm.assignNode(DefaultResourceGroupName, node)
|
||||||
|
return ret, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = append(ret, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
return lackNodesNum, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *ResourceManager) Recover() error {
|
func (rm *ResourceManager) Recover() error {
|
||||||
|
@ -614,17 +655,16 @@ func (rm *ResourceManager) Recover() error {
|
||||||
rm.groups[rg.GetName()].assignNode(node, 0)
|
rm.groups[rg.GetName()].assignNode(node, 0)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rm.groups[rg.GetName()] = NewResourceGroup(0)
|
rm.groups[rg.GetName()] = NewResourceGroup(int(rg.GetCapacity()))
|
||||||
for _, node := range rg.GetNodes() {
|
for _, node := range rg.GetNodes() {
|
||||||
rm.groups[rg.GetName()].assignNode(node, 1)
|
rm.groups[rg.GetName()].assignNode(node, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rm.checkRGNodeStatus(rg.GetName())
|
|
||||||
log.Info("Recover resource group",
|
log.Info("Recover resource group",
|
||||||
zap.String("rgName", rg.GetName()),
|
zap.String("rgName", rg.GetName()),
|
||||||
zap.Int64s("nodes", rg.GetNodes()),
|
zap.Int64s("nodes", rm.groups[rg.GetName()].GetNodes()),
|
||||||
zap.Int32("capacity", rg.GetCapacity()),
|
zap.Int("capacity", rm.groups[rg.GetName()].GetCapacity()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
|
@ -24,6 +25,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -114,14 +116,14 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
|
||||||
suite.ErrorIs(err, ErrNodeAlreadyAssign)
|
suite.ErrorIs(err, ErrNodeAlreadyAssign)
|
||||||
|
|
||||||
// transfer node between rgs
|
// transfer node between rgs
|
||||||
err = suite.manager.TransferNode("rg1", "rg2", 1)
|
_, err = suite.manager.TransferNode("rg1", "rg2", 1)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
|
||||||
// transfer meet non exist rg
|
// transfer meet non exist rg
|
||||||
err = suite.manager.TransferNode("rgggg", "rg2", 1)
|
_, err = suite.manager.TransferNode("rgggg", "rg2", 1)
|
||||||
suite.ErrorIs(err, ErrRGNotExist)
|
suite.ErrorIs(err, ErrRGNotExist)
|
||||||
|
|
||||||
err = suite.manager.TransferNode("rg1", "rg2", 5)
|
_, err = suite.manager.TransferNode("rg1", "rg2", 5)
|
||||||
suite.ErrorIs(err, ErrNodeNotEnough)
|
suite.ErrorIs(err, ErrNodeNotEnough)
|
||||||
|
|
||||||
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
|
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
|
||||||
|
@ -188,26 +190,42 @@ func (suite *ResourceManagerSuite) TestRecover() {
|
||||||
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||||
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||||
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
|
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
|
||||||
err := suite.manager.AddResourceGroup("rg")
|
suite.manager.nodeMgr.Add(session.NewNodeInfo(4, "localhost"))
|
||||||
|
err := suite.manager.AddResourceGroup("rg1")
|
||||||
|
suite.NoError(err)
|
||||||
|
err = suite.manager.AddResourceGroup("rg2")
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
|
||||||
suite.manager.AssignNode("rg", 1)
|
suite.manager.AssignNode("rg1", 1)
|
||||||
suite.manager.AssignNode("rg", 2)
|
suite.manager.AssignNode("rg2", 2)
|
||||||
suite.manager.AssignNode("rg", 3)
|
suite.manager.AssignNode(DefaultResourceGroupName, 3)
|
||||||
|
suite.manager.AssignNode(DefaultResourceGroupName, 4)
|
||||||
|
|
||||||
suite.manager.UnassignNode("rg", 3)
|
suite.manager.HandleNodeDown(2)
|
||||||
|
suite.manager.HandleNodeDown(3)
|
||||||
|
|
||||||
// clear resource manager in hack way
|
// clear resource manager in hack way
|
||||||
delete(suite.manager.groups, "rg")
|
delete(suite.manager.groups, "rg1")
|
||||||
|
delete(suite.manager.groups, "rg2")
|
||||||
delete(suite.manager.groups, DefaultResourceGroupName)
|
delete(suite.manager.groups, DefaultResourceGroupName)
|
||||||
suite.manager.Recover()
|
suite.manager.Recover()
|
||||||
|
|
||||||
rg, err := suite.manager.GetResourceGroup("rg")
|
rg, err := suite.manager.GetResourceGroup("rg1")
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(2, rg.GetCapacity())
|
suite.Equal(1, rg.GetCapacity())
|
||||||
suite.True(suite.manager.ContainsNode("rg", 1))
|
suite.True(suite.manager.ContainsNode("rg1", 1))
|
||||||
suite.True(suite.manager.ContainsNode("rg", 2))
|
print(suite.manager.GetNodes("rg1"))
|
||||||
suite.False(suite.manager.ContainsNode("rg", 3))
|
|
||||||
|
rg, err = suite.manager.GetResourceGroup("rg2")
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(1, rg.GetCapacity())
|
||||||
|
suite.False(suite.manager.ContainsNode("rg2", 2))
|
||||||
|
|
||||||
|
rg, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(DefaultResourceGroupCapacity, rg.GetCapacity())
|
||||||
|
suite.False(suite.manager.ContainsNode(DefaultResourceGroupName, 3))
|
||||||
|
suite.True(suite.manager.ContainsNode(DefaultResourceGroupName, 4))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
|
func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
|
||||||
|
@ -340,6 +358,46 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
|
||||||
suite.Len(defaultRG.GetNodes(), 6)
|
suite.Len(defaultRG.GetNodes(), 6)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ResourceManagerSuite) TestStoreFailed() {
|
||||||
|
store := NewMockStore(suite.T())
|
||||||
|
nodeMgr := session.NewNodeManager()
|
||||||
|
manager := NewResourceManager(store, nodeMgr)
|
||||||
|
|
||||||
|
nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||||
|
nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||||
|
nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
|
||||||
|
storeErr := errors.New("store error")
|
||||||
|
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr)
|
||||||
|
store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr)
|
||||||
|
|
||||||
|
err := manager.AddResourceGroup("rg")
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
manager.groups["rg"] = &ResourceGroup{
|
||||||
|
nodes: typeutil.NewUniqueSet(),
|
||||||
|
capacity: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = manager.RemoveResourceGroup("rg")
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
err = manager.AssignNode("rg", 1)
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
manager.groups["rg"].assignNode(1, 1)
|
||||||
|
err = manager.UnassignNode("rg", 1)
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
_, err = manager.TransferNode("rg", DefaultResourceGroupName, 1)
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
_, err = manager.HandleNodeUp(2)
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
|
||||||
|
_, err = manager.HandleNodeDown(1)
|
||||||
|
suite.ErrorIs(err, storeErr)
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *ResourceManagerSuite) TearDownSuite() {
|
func (suite *ResourceManagerSuite) TearDownSuite() {
|
||||||
suite.kv.Close()
|
suite.kv.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,14 +94,16 @@ func (ob *ResourceObserver) checkResourceGroup() {
|
||||||
)
|
)
|
||||||
|
|
||||||
if enableRGAutoRecover {
|
if enableRGAutoRecover {
|
||||||
usedNodeNum, err := manager.AutoRecoverResourceGroup(rgName)
|
nodes, err := manager.AutoRecoverResourceGroup(rgName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to recover resource group",
|
log.Warn("failed to recover resource group",
|
||||||
zap.String("rgName", rgName),
|
zap.String("rgName", rgName),
|
||||||
zap.Int("lackNodeNum", lackNodeNum-usedNodeNum),
|
zap.Int("lackNodeNum", lackNodeNum-len(nodes)),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
utils.AddNodesToCollectionsInRG(ob.meta, rgName, nodes...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
// "License"); you may not use this file except in compliance
|
// "License"); you may not use this file except in compliance
|
||||||
// with the License. You may obtain a copy of the License at
|
// with the License. You may obtain a copy of the License at
|
||||||
//
|
//
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
//
|
//
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
@ -17,15 +17,20 @@ package observers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,6 +39,7 @@ type ResourceObserverSuite struct {
|
||||||
|
|
||||||
kv *etcdKV.EtcdKV
|
kv *etcdKV.EtcdKV
|
||||||
//dependency
|
//dependency
|
||||||
|
store *meta.MockStore
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
observer *ResourceObserver
|
observer *ResourceObserver
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
|
@ -62,43 +68,137 @@ func (suite *ResourceObserverSuite) SetupTest() {
|
||||||
suite.kv = etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
suite.kv = etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
||||||
|
|
||||||
// meta
|
// meta
|
||||||
store := meta.NewMetaStore(suite.kv)
|
suite.store = meta.NewMockStore(suite.T())
|
||||||
idAllocator := RandomIncrementIDAllocator()
|
idAllocator := RandomIncrementIDAllocator()
|
||||||
suite.nodeMgr = session.NewNodeManager()
|
suite.nodeMgr = session.NewNodeManager()
|
||||||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
suite.meta = meta.NewMeta(idAllocator, suite.store, suite.nodeMgr)
|
||||||
|
|
||||||
suite.observer = NewResourceObserver(suite.meta)
|
suite.observer = NewResourceObserver(suite.meta)
|
||||||
suite.observer.Start(context.TODO())
|
suite.observer.Start(context.TODO())
|
||||||
|
|
||||||
for i := 1; i < 10; i++ {
|
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
|
||||||
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i))
|
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
|
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
|
||||||
|
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||||
|
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil)
|
||||||
|
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{100, 101},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(100, 101),
|
||||||
|
))
|
||||||
|
|
||||||
|
// hack all node down from replica
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 2,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
suite.meta.ResourceManager.AddResourceGroup("rg")
|
suite.meta.ResourceManager.AddResourceGroup("rg")
|
||||||
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
|
||||||
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
|
||||||
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
|
||||||
suite.meta.ResourceManager.AssignNode("rg", 100)
|
suite.meta.ResourceManager.AssignNode("rg", 100)
|
||||||
suite.meta.ResourceManager.AssignNode("rg", 101)
|
suite.meta.ResourceManager.AssignNode("rg", 101)
|
||||||
suite.meta.ResourceManager.AssignNode("rg", 102)
|
suite.meta.ResourceManager.AssignNode("rg", 102)
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", 103)
|
||||||
suite.meta.ResourceManager.HandleNodeDown(100)
|
suite.meta.ResourceManager.HandleNodeDown(100)
|
||||||
suite.meta.ResourceManager.HandleNodeDown(101)
|
suite.meta.ResourceManager.HandleNodeDown(101)
|
||||||
|
|
||||||
//before auto recover rg
|
//before auto recover rg
|
||||||
suite.Eventually(func() bool {
|
suite.Eventually(func() bool {
|
||||||
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
||||||
return lackNodesNum == 2
|
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
|
||||||
|
return lackNodesNum == 2 && len(nodesInReplica) == 0
|
||||||
}, 5*time.Second, 1*time.Second)
|
}, 5*time.Second, 1*time.Second)
|
||||||
|
|
||||||
// after auto recover rg
|
// after auto recover rg
|
||||||
suite.Eventually(func() bool {
|
suite.Eventually(func() bool {
|
||||||
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
||||||
return lackNodesNum == 0
|
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
|
||||||
|
return lackNodesNum == 0 && len(nodesInReplica) == 2
|
||||||
|
}, 5*time.Second, 1*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
|
||||||
|
suite.meta.ResourceManager.AddResourceGroup("rg")
|
||||||
|
for i := 100; i < 200; i++ {
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", int64(i))
|
||||||
|
suite.meta.ResourceManager.HandleNodeDown(int64(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
||||||
|
return lackNodesNum == 90
|
||||||
|
}, 5*time.Second, 1*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
|
||||||
|
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||||
|
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(2)
|
||||||
|
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{100, 101},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(100, 101),
|
||||||
|
))
|
||||||
|
|
||||||
|
// hack all node down from replica
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 2,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
|
||||||
|
suite.meta.ResourceManager.AddResourceGroup("rg")
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", 100)
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", 101)
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", 102)
|
||||||
|
suite.meta.ResourceManager.AssignNode("rg", 103)
|
||||||
|
suite.meta.ResourceManager.HandleNodeDown(100)
|
||||||
|
suite.meta.ResourceManager.HandleNodeDown(101)
|
||||||
|
|
||||||
|
//before auto recover rg
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
||||||
|
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
|
||||||
|
return lackNodesNum == 2 && len(nodesInReplica) == 0
|
||||||
}, 5*time.Second, 1*time.Second)
|
}, 5*time.Second, 1*time.Second)
|
||||||
|
|
||||||
|
// after auto recover rg
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
|
||||||
|
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
|
||||||
|
return lackNodesNum == 0 && len(nodesInReplica) == 0
|
||||||
|
}, 5*time.Second, 1*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ResourceObserverSuite) TearDownSuite() {
|
func (suite *ResourceObserverSuite) TearDownSuite() {
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -46,6 +45,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
|
@ -94,7 +94,7 @@ type Server struct {
|
||||||
taskScheduler task.Scheduler
|
taskScheduler task.Scheduler
|
||||||
|
|
||||||
// HeartBeat
|
// HeartBeat
|
||||||
distController *dist.Controller
|
distController dist.Controller
|
||||||
|
|
||||||
// Checkers
|
// Checkers
|
||||||
checkerController *checkers.CheckerController
|
checkerController *checkers.CheckerController
|
||||||
|
@ -648,29 +648,10 @@ func (s *Server) handleNodeUp(node int64) {
|
||||||
zap.String("resourceGroup", rgName),
|
zap.String("resourceGroup", rgName),
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, collection := range s.meta.CollectionManager.GetAll() {
|
rgs := s.meta.ResourceManager.ListResourceGroups()
|
||||||
log := log.With(zap.Int64("collectionID", collection))
|
if len(rgs) == 1 {
|
||||||
replica := s.meta.ReplicaManager.GetByCollectionAndNode(collection, node)
|
// only __default_resource_group exists
|
||||||
if replica == nil {
|
utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
|
||||||
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(collection, rgName)
|
|
||||||
if len(replicas) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sort.Slice(replicas, func(i, j int) bool {
|
|
||||||
return replicas[i].Len() < replicas[j].Len()
|
|
||||||
})
|
|
||||||
replica := replicas[0]
|
|
||||||
// TODO(yah01): this may fail, need a component to check whether a node is assigned
|
|
||||||
err = s.meta.ReplicaManager.AddNode(replica.GetID(), node)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("failed to assign node to replicas",
|
|
||||||
zap.Int64("replicaID", replica.GetID()),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
log.Info("assign node to replica",
|
|
||||||
zap.Int64("replicaID", replica.GetID()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1044,12 +1044,30 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
|
||||||
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
|
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
|
replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup())
|
||||||
|
replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup())
|
||||||
|
loadSameCollection := false
|
||||||
|
for _, r1 := range replicasInSource {
|
||||||
|
for _, r2 := range replicasInTarget {
|
||||||
|
if r1.GetCollectionID() == r2.GetCollectionID() {
|
||||||
|
loadSameCollection = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if loadSameCollection {
|
||||||
|
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
|
||||||
|
fmt.Sprintf("can't transfer node, cause the resource group[%s] and the resource group[%s] loaded same collection",
|
||||||
|
req.GetSourceResourceGroup(), req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
|
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
|
||||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil
|
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...)
|
||||||
|
|
||||||
return successStatus, nil
|
return successStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,10 +27,12 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/kv"
|
"github.com/milvus-io/milvus/internal/kv"
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
|
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
|
||||||
|
@ -73,7 +75,8 @@ type ServiceSuite struct {
|
||||||
taskScheduler *task.MockScheduler
|
taskScheduler *task.MockScheduler
|
||||||
balancer balance.Balance
|
balancer balance.Balance
|
||||||
|
|
||||||
distMgr *meta.DistributionManager
|
distMgr *meta.DistributionManager
|
||||||
|
distController *dist.MockController
|
||||||
|
|
||||||
// Test object
|
// Test object
|
||||||
server *Server
|
server *Server
|
||||||
|
@ -156,6 +159,7 @@ func (suite *ServiceSuite) SetupTest() {
|
||||||
)
|
)
|
||||||
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
|
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
|
||||||
suite.distMgr = meta.NewDistributionManager()
|
suite.distMgr = meta.NewDistributionManager()
|
||||||
|
suite.distController = dist.NewMockController(suite.T())
|
||||||
|
|
||||||
suite.server = &Server{
|
suite.server = &Server{
|
||||||
kv: suite.kv,
|
kv: suite.kv,
|
||||||
|
@ -172,6 +176,8 @@ func (suite *ServiceSuite) SetupTest() {
|
||||||
jobScheduler: suite.jobScheduler,
|
jobScheduler: suite.jobScheduler,
|
||||||
taskScheduler: suite.taskScheduler,
|
taskScheduler: suite.taskScheduler,
|
||||||
balancer: suite.balancer,
|
balancer: suite.balancer,
|
||||||
|
distController: suite.distController,
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
suite.server.collectionObserver = observers.NewCollectionObserver(
|
suite.server.collectionObserver = observers.NewCollectionObserver(
|
||||||
suite.server.dist,
|
suite.server.dist,
|
||||||
|
@ -469,6 +475,16 @@ func (suite *ServiceSuite) TestTransferNode() {
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
err = server.meta.ResourceManager.AddResourceGroup("rg2")
|
err = server.meta.ResourceManager.AddResourceGroup("rg2")
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg1",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
// test transfer node
|
// test transfer node
|
||||||
resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||||
SourceResourceGroup: meta.DefaultResourceGroupName,
|
SourceResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
@ -480,6 +496,26 @@ func (suite *ServiceSuite) TestTransferNode() {
|
||||||
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
|
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Len(nodes, 1)
|
suite.Len(nodes, 1)
|
||||||
|
nodesInReplica := server.meta.ReplicaManager.Get(1).GetNodes()
|
||||||
|
suite.Len(nodesInReplica, 1)
|
||||||
|
|
||||||
|
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 2,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg2",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||||
|
SourceResourceGroup: "rg1",
|
||||||
|
TargetResourceGroup: "rg2",
|
||||||
|
NumNode: 1,
|
||||||
|
})
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
||||||
|
suite.Contains(resp.Reason, "can't transfer node")
|
||||||
|
|
||||||
// test transfer node meet non-exist source rg
|
// test transfer node meet non-exist source rg
|
||||||
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
|
||||||
|
@ -1540,6 +1576,38 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ServiceSuite) TestHandleNodeUp() {
|
||||||
|
server := suite.server
|
||||||
|
suite.server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||||
|
suite.server.meta.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
suite.taskScheduler.EXPECT().AddExecutor(mock.Anything)
|
||||||
|
suite.distController.EXPECT().StartDistInstance(mock.Anything, mock.Anything)
|
||||||
|
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(111, "localhost"))
|
||||||
|
server.handleNodeUp(111)
|
||||||
|
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
|
||||||
|
suite.Len(nodes, 1)
|
||||||
|
suite.Equal(int64(111), nodes[0])
|
||||||
|
log.Info("handleNodeUp")
|
||||||
|
|
||||||
|
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
|
||||||
|
suite.server.meta.ResourceManager.AddResourceGroup("rg")
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost"))
|
||||||
|
server.handleNodeUp(222)
|
||||||
|
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
|
||||||
|
suite.Len(nodes, 1)
|
||||||
|
suite.Equal(int64(111), nodes[0])
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) loadAll() {
|
func (suite *ServiceSuite) loadAll() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for _, collection := range suite.collections {
|
for _, collection := range suite.collections {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
|
@ -146,6 +147,39 @@ func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replic
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add nodes to all collections in rgName
|
||||||
|
// for each collection, add node to replica with least number of nodes
|
||||||
|
func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) {
|
||||||
|
for _, node := range nodes {
|
||||||
|
for _, collection := range m.CollectionManager.GetAll() {
|
||||||
|
log := log.With(zap.Int64("collectionID", collection))
|
||||||
|
replica := m.ReplicaManager.GetByCollectionAndNode(collection, node)
|
||||||
|
if replica == nil {
|
||||||
|
replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName)
|
||||||
|
if len(replicas) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sort.Slice(replicas, func(i, j int) bool {
|
||||||
|
return replicas[i].Len() < replicas[j].Len()
|
||||||
|
})
|
||||||
|
replica := replicas[0]
|
||||||
|
// TODO(yah01): this may fail, need a component to check whether a node is assigned
|
||||||
|
err := m.ReplicaManager.AddNode(replica.GetID(), node)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to assign node to replicas",
|
||||||
|
zap.Int64("replicaID", replica.GetID()),
|
||||||
|
zap.Int64("nodeId", node),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Info("assign node to replica",
|
||||||
|
zap.Int64("replicaID", replica.GetID()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them
|
// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them
|
||||||
func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) {
|
func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) {
|
||||||
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName)
|
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName)
|
||||||
|
|
|
@ -17,13 +17,18 @@
|
||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSpawnReplicasWithRG(t *testing.T) {
|
func TestSpawnReplicasWithRG(t *testing.T) {
|
||||||
|
@ -108,3 +113,125 @@ func TestSpawnReplicasWithRG(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
|
||||||
|
store := meta.NewMockStore(t)
|
||||||
|
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||||
|
store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4)
|
||||||
|
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
|
||||||
|
nodeMgr := session.NewNodeManager()
|
||||||
|
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
|
||||||
|
m.ResourceManager.AddResourceGroup("rg")
|
||||||
|
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
|
||||||
|
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 2,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 3,
|
||||||
|
CollectionID: 2,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 4,
|
||||||
|
CollectionID: 2,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
storeErr := errors.New("store error")
|
||||||
|
store.EXPECT().SaveReplica(mock.Anything).Return(storeErr)
|
||||||
|
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
|
||||||
|
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 0)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 0)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 0)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddNodesToCollectionsInRG(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
|
||||||
|
store := meta.NewMockStore(t)
|
||||||
|
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||||
|
store.EXPECT().SaveReplica(mock.Anything).Return(nil)
|
||||||
|
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
|
||||||
|
nodeMgr := session.NewNodeManager()
|
||||||
|
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
|
||||||
|
m.ResourceManager.AddResourceGroup("rg")
|
||||||
|
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
|
||||||
|
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 2,
|
||||||
|
CollectionID: 1,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 3,
|
||||||
|
CollectionID: 2,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
m.ReplicaManager.Put(meta.NewReplica(
|
||||||
|
&querypb.Replica{
|
||||||
|
ID: 4,
|
||||||
|
CollectionID: 2,
|
||||||
|
Nodes: []int64{},
|
||||||
|
ResourceGroup: "rg",
|
||||||
|
},
|
||||||
|
typeutil.NewUniqueSet(),
|
||||||
|
))
|
||||||
|
|
||||||
|
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
|
||||||
|
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 2)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(3).GetNodes(), 2)
|
||||||
|
assert.Len(t, m.ReplicaManager.Get(4).GetNodes(), 2)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue