enhance: declarative resource group api (#31930)

issue: #30647

- Add declarative resource group api

- Add config for resource group management

- Resource group recovery enhancement

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/32018/head
chyezh 2024-04-15 08:13:19 +08:00 committed by GitHub
parent e26cc9bfb5
commit 48fe977a9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 2856 additions and 1262 deletions

View File

@ -1150,6 +1150,10 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
return s.proxy.CreateResourceGroup(ctx, req)
}
func (s *Server) UpdateResourceGroups(ctx context.Context, req *milvuspb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
return s.proxy.UpdateResourceGroups(ctx, req)
}
func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
return s.proxy.DropResourceGroup(ctx, req)
}

View File

@ -315,6 +315,17 @@ func (c *Client) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
})
}
func (c *Client) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
return client.UpdateResourceGroups(ctx, req)
})
}
func (c *Client) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(

View File

@ -415,6 +415,10 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
return s.queryCoord.CreateResourceGroup(ctx, req)
}
func (s *Server) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
return s.queryCoord.UpdateResourceGroups(ctx, req)
}
func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
return s.queryCoord.DropResourceGroup(ctx, req)
}

View File

@ -2394,6 +2394,61 @@ func (_c *MockQueryCoord_TransferSegment_Call) RunAndReturn(run func(context.Con
return _c
}
// UpdateResourceGroups provides a mock function with given fields: _a0, _a1
func (_m *MockQueryCoord) UpdateResourceGroups(_a0 context.Context, _a1 *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateResourceGroupsRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.UpdateResourceGroupsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryCoord_UpdateResourceGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateResourceGroups'
type MockQueryCoord_UpdateResourceGroups_Call struct {
*mock.Call
}
// UpdateResourceGroups is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.UpdateResourceGroupsRequest
func (_e *MockQueryCoord_Expecter) UpdateResourceGroups(_a0 interface{}, _a1 interface{}) *MockQueryCoord_UpdateResourceGroups_Call {
return &MockQueryCoord_UpdateResourceGroups_Call{Call: _e.mock.On("UpdateResourceGroups", _a0, _a1)}
}
func (_c *MockQueryCoord_UpdateResourceGroups_Call) Run(run func(_a0 context.Context, _a1 *querypb.UpdateResourceGroupsRequest)) *MockQueryCoord_UpdateResourceGroups_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.UpdateResourceGroupsRequest))
})
return _c
}
func (_c *MockQueryCoord_UpdateResourceGroups_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoord_UpdateResourceGroups_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryCoord_UpdateResourceGroups_Call) RunAndReturn(run func(context.Context, *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error)) *MockQueryCoord_UpdateResourceGroups_Call {
_c.Call.Return(run)
return _c
}
// UpdateStateCode provides a mock function with given fields: stateCode
func (_m *MockQueryCoord) UpdateStateCode(stateCode commonpb.StateCode) {
_m.Called(stateCode)

View File

@ -2592,6 +2592,76 @@ func (_c *MockQueryCoordClient_TransferSegment_Call) RunAndReturn(run func(conte
return _c
}
// UpdateResourceGroups provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryCoordClient) UpdateResourceGroups(ctx context.Context, in *querypb.UpdateResourceGroupsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateResourceGroupsRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateResourceGroupsRequest, ...grpc.CallOption) *commonpb.Status); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.UpdateResourceGroupsRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryCoordClient_UpdateResourceGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateResourceGroups'
type MockQueryCoordClient_UpdateResourceGroups_Call struct {
*mock.Call
}
// UpdateResourceGroups is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.UpdateResourceGroupsRequest
// - opts ...grpc.CallOption
func (_e *MockQueryCoordClient_Expecter) UpdateResourceGroups(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_UpdateResourceGroups_Call {
return &MockQueryCoordClient_UpdateResourceGroups_Call{Call: _e.mock.On("UpdateResourceGroups",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryCoordClient_UpdateResourceGroups_Call) Run(run func(ctx context.Context, in *querypb.UpdateResourceGroupsRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_UpdateResourceGroups_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.UpdateResourceGroupsRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryCoordClient_UpdateResourceGroups_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoordClient_UpdateResourceGroups_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryCoordClient_UpdateResourceGroups_Call) RunAndReturn(run func(context.Context, *querypb.UpdateResourceGroupsRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockQueryCoordClient_UpdateResourceGroups_Call {
_c.Call.Return(run)
return _c
}
// NewMockQueryCoordClient creates a new instance of MockQueryCoordClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockQueryCoordClient(t interface {

View File

@ -6,6 +6,7 @@ option go_package = "github.com/milvus-io/milvus/internal/proto/querypb";
import "common.proto";
import "milvus.proto";
import "rg.proto";
import "internal.proto";
import "schema.proto";
import "msg.proto";
@ -72,6 +73,9 @@ service QueryCoord {
rpc CreateResourceGroup(milvus.CreateResourceGroupRequest)
returns (common.Status) {
}
rpc UpdateResourceGroups(UpdateResourceGroupsRequest)
returns (common.Status) {
}
rpc DropResourceGroup(milvus.DropResourceGroupRequest)
returns (common.Status) {
}
@ -283,6 +287,11 @@ message GetShardLeadersResponse {
repeated ShardLeadersList shards = 2;
}
message UpdateResourceGroupsRequest {
common.MsgBase base = 1;
map<string, rg.ResourceGroupConfig> resource_groups = 2;
}
message ShardLeadersList { // All leaders of all replicas of one shard
string channel_name = 1;
repeated int64 node_ids = 2;
@ -693,8 +702,9 @@ message SyncDistributionRequest {
message ResourceGroup {
string name = 1;
int32 capacity = 2;
int32 capacity = 2 [deprecated = true]; // capacity can be found in config.requests.nodeNum and config.limits.nodeNum.
repeated int64 nodes = 3;
rg.ResourceGroupConfig config = 4;
}
// transfer `replicaNum` replicas in `collectionID` from `source_resource_group` to `target_resource_groups`
@ -718,15 +728,19 @@ message DescribeResourceGroupResponse {
message ResourceGroupInfo {
string name = 1;
int32 capacity = 2;
int32 capacity = 2 [deprecated = true]; // capacity can be found in config.requests.nodeNum and config.limits.nodeNum.
int32 num_available_node = 3;
// collection id -> loaded replica num
map<int64, int32> num_loaded_replica = 4;
// collection id -> accessed other rg's node num
// collection id -> accessed other rg's node num
map<int64, int32> num_outgoing_node = 5;
// collection id -> be accessed node num by other rg
// collection id -> be accessed node num by other rg
map<int64, int32> num_incoming_node = 6;
// resource group configuration.
rg.ResourceGroupConfig config = 7;
repeated common.NodeInfo nodes = 8;
}
message DeleteRequest {
common.MsgBase base = 1;
int64 collection_id = 2;

View File

@ -5291,6 +5291,65 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr
return t.result, nil
}
func (node *Proxy) UpdateResourceGroups(ctx context.Context, request *milvuspb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil
}
method := "UpdateResourceGroups"
for name := range request.GetResourceGroups() {
if err := ValidateResourceGroupName(name); err != nil {
log.Warn("UpdateResourceGroups failed",
zap.Error(err),
)
return getErrResponse(err, method, "", ""), nil
}
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-UpdateResourceGroups")
defer sp.End()
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, "", "").Inc()
t := &UpdateResourceGroupsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
UpdateResourceGroupsRequest: request,
queryCoord: node.queryCoord,
}
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
)
log.Info("UpdateResourceGroups received")
if err := node.sched.ddQueue.Enqueue(t); err != nil {
log.Warn("UpdateResourceGroups failed to enqueue",
zap.Error(err))
return getErrResponse(err, method, "", ""), nil
}
log.Debug("UpdateResourceGroups enqueued",
zap.Uint64("BeginTS", t.BeginTs()),
zap.Uint64("EndTS", t.EndTs()))
if err := t.WaitToFinish(); err != nil {
log.Warn("UpdateResourceGroups failed to WaitToFinish",
zap.Error(err),
zap.Uint64("BeginTS", t.BeginTs()),
zap.Uint64("EndTS", t.EndTs()))
return getErrResponse(err, method, "", ""), nil
}
log.Info("UpdateResourceGroups done",
zap.Uint64("BeginTS", t.BeginTs()),
zap.Uint64("EndTS", t.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, "", "").Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return t.result, nil
}
func getErrResponse(err error, method string, dbName string, collectionName string) *commonpb.Status {
metrics.ProxyFunctionCall.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, dbName, collectionName).Inc()

View File

@ -79,6 +79,7 @@ const (
AlterCollectionTaskName = "AlterCollectionTask"
UpsertTaskName = "UpsertTask"
CreateResourceGroupTaskName = "CreateResourceGroupTask"
UpdateResourceGroupsTaskName = "UpdateResourceGroupsTask"
DropResourceGroupTaskName = "DropResourceGroupTask"
TransferNodeTaskName = "TransferNodeTask"
TransferReplicaTaskName = "TransferReplicaTask"
@ -2030,6 +2031,74 @@ func (t *CreateResourceGroupTask) PostExecute(ctx context.Context) error {
return nil
}
type UpdateResourceGroupsTask struct {
baseTask
Condition
*milvuspb.UpdateResourceGroupsRequest
ctx context.Context
queryCoord types.QueryCoordClient
result *commonpb.Status
}
func (t *UpdateResourceGroupsTask) TraceCtx() context.Context {
return t.ctx
}
func (t *UpdateResourceGroupsTask) ID() UniqueID {
return t.Base.MsgID
}
func (t *UpdateResourceGroupsTask) SetID(uid UniqueID) {
t.Base.MsgID = uid
}
func (t *UpdateResourceGroupsTask) Name() string {
return UpdateResourceGroupsTaskName
}
func (t *UpdateResourceGroupsTask) Type() commonpb.MsgType {
return t.Base.MsgType
}
func (t *UpdateResourceGroupsTask) BeginTs() Timestamp {
return t.Base.Timestamp
}
func (t *UpdateResourceGroupsTask) EndTs() Timestamp {
return t.Base.Timestamp
}
func (t *UpdateResourceGroupsTask) SetTs(ts Timestamp) {
t.Base.Timestamp = ts
}
func (t *UpdateResourceGroupsTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
return nil
}
func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups
t.Base.SourceID = paramtable.GetNodeID()
return nil
}
func (t *UpdateResourceGroupsTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.queryCoord.UpdateResourceGroups(ctx, &querypb.UpdateResourceGroupsRequest{
Base: t.UpdateResourceGroupsRequest.GetBase(),
ResourceGroups: t.UpdateResourceGroupsRequest.GetResourceGroups(),
})
return err
}
func (t *UpdateResourceGroupsTask) PostExecute(ctx context.Context) error {
return nil
}
type DropResourceGroupTask struct {
baseTask
Condition
@ -2202,6 +2271,8 @@ func (t *DescribeResourceGroupTask) Execute(ctx context.Context) error {
NumLoadedReplica: numLoadedReplica,
NumOutgoingNode: numOutgoingNode,
NumIncomingNode: numIncomingNode,
Config: rgInfo.Config,
Nodes: rgInfo.Nodes,
},
}
} else {

View File

@ -477,7 +477,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
@ -688,7 +688,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans)
@ -831,10 +831,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
suite.balancer.nodeManager.Add(nodeInfo)
}
// make node-3 outbound
err := balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.NoError(err)
err = balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
suite.NoError(err)
balancer.meta.ResourceManager.HandleNodeUp(1)
balancer.meta.ResourceManager.HandleNodeUp(2)
utils.RecoverAllCollection(balancer.meta)
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans)
@ -1063,7 +1061,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
Params.Save(Params.QueryCoordCfg.AutoBalanceChannel.Key, fmt.Sprint(c.enableBalanceChannel))
@ -1192,7 +1190,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(nodes[i])
}
}

View File

@ -436,7 +436,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
// 4. balance and verify result
@ -548,7 +548,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
})
nodeInfo.SetState(balanceCase.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, balanceCase.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(balanceCase.nodes[i])
}
// 4. first round balance
@ -697,11 +697,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
for i := range c.outBoundNodes {
suite.balancer.meta.ResourceManager.UnassignNode(meta.DefaultResourceGroupName, c.outBoundNodes[i])
suite.balancer.meta.ResourceManager.HandleNodeDown(c.outBoundNodes[i])
}
utils.RecoverAllCollection(balancer.meta)
@ -817,7 +817,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() {
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, nodes[i])
suite.balancer.meta.ResourceManager.HandleNodeUp(nodes[i])
}
}

View File

@ -98,8 +98,8 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID2))
// set collections meta
segments := []*datapb.SegmentInfo{
@ -175,8 +175,8 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
@ -239,8 +239,8 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
@ -299,18 +299,20 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
// set up nodes info, stopping node1
nodeID1, nodeID2 := 1, 2
nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
suite.nodeMgr.Stopping(nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID2)
segments := []*datapb.SegmentInfo{
{
@ -331,7 +333,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
@ -341,7 +343,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{nodeID1, nodeID2})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)

View File

@ -126,7 +126,7 @@ func (suite *ChannelCheckerTestSuite) TestLoadChannel() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.HandleNodeUp(1)
channels := []*datapb.VchannelInfo{
{

View File

@ -99,8 +99,8 @@ func (suite *CheckerControllerSuite) TestBasic() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
suite.meta.ResourceManager.HandleNodeUp(1)
suite.meta.ResourceManager.HandleNodeUp(2)
// set target
channels := []*datapb.VchannelInfo{

View File

@ -98,8 +98,8 @@ func (suite *IndexCheckerSuite) TestLoadIndex() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// dist
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
@ -159,8 +159,8 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// dist
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
@ -219,8 +219,8 @@ func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// dist
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
@ -258,8 +258,8 @@ func (suite *IndexCheckerSuite) TestCreateNewIndex() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// dist
segment := utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")

View File

@ -119,8 +119,8 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// set target
segments := []*datapb.SegmentInfo{
@ -184,8 +184,8 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// set target
segments := []*datapb.SegmentInfo{
@ -269,8 +269,8 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// set target
segments := []*datapb.SegmentInfo{
@ -343,8 +343,8 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
checker.meta.ResourceManager.HandleNodeUp(1)
checker.meta.ResourceManager.HandleNodeUp(2)
// set target
segments := []*datapb.SegmentInfo{

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
@ -185,12 +186,10 @@ func (suite *JobSuite) SetupTest() {
Address: "localhost",
Hostname: "localhost",
}))
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 1000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 2000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 3000)
suite.NoError(err)
suite.meta.HandleNodeUp(1000)
suite.meta.HandleNodeUp(2000)
suite.meta.HandleNodeUp(3000)
suite.checkerController = &checkers.CheckerController{}
}
@ -318,9 +317,18 @@ func (suite *JobSuite) TestLoadCollection() {
suite.NoError(err)
}
suite.meta.ResourceManager.AddResourceGroup("rg1")
suite.meta.ResourceManager.AddResourceGroup("rg2")
suite.meta.ResourceManager.AddResourceGroup("rg3")
cfg := &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
}
suite.meta.ResourceManager.AddResourceGroup("rg1", cfg)
suite.meta.ResourceManager.AddResourceGroup("rg2", cfg)
suite.meta.ResourceManager.AddResourceGroup("rg3", cfg)
// Load with 3 replica on 1 rg
req := &querypb.LoadCollectionRequest{
@ -597,9 +605,17 @@ func (suite *JobSuite) TestLoadPartition() {
suite.NoError(err)
}
suite.meta.ResourceManager.AddResourceGroup("rg1")
suite.meta.ResourceManager.AddResourceGroup("rg2")
suite.meta.ResourceManager.AddResourceGroup("rg3")
cfg := &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
}
suite.meta.ResourceManager.AddResourceGroup("rg1", cfg)
suite.meta.ResourceManager.AddResourceGroup("rg2", cfg)
suite.meta.ResourceManager.AddResourceGroup("rg3", cfg)
// test load 3 replica in 1 rg, should pass rg check
req := &querypb.LoadPartitionsRequest{
@ -1091,12 +1107,9 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() {
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), store, suite.nodeMgr)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
err := suite.meta.AssignNode(meta.DefaultResourceGroupName, 1000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 2000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 3000)
suite.NoError(err)
suite.meta.HandleNodeUp(1000)
suite.meta.HandleNodeUp(2000)
suite.meta.HandleNodeUp(3000)
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
@ -1134,14 +1147,11 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() {
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), store, suite.nodeMgr)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
err := suite.meta.AssignNode(meta.DefaultResourceGroupName, 1000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 2000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 3000)
suite.NoError(err)
suite.meta.HandleNodeUp(1000)
suite.meta.HandleNodeUp(2000)
suite.meta.HandleNodeUp(3000)
err = errors.New("failed to store collection")
err := errors.New("failed to store collection")
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue

View File

@ -0,0 +1,237 @@
package meta
import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
DefaultResourceGroupName = "__default_resource_group"
defaultResourceGroupCapacity int32 = 1000000
resourceGroupTransferBoost = 10000
)
// newResourceGroupConfig create a new resource group config.
func newResourceGroupConfig(request int32, limit int32) *rgpb.ResourceGroupConfig {
return &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: request,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: limit,
},
TransferFrom: make([]*rgpb.ResourceGroupTransfer, 0),
TransferTo: make([]*rgpb.ResourceGroupTransfer, 0),
}
}
type ResourceGroup struct {
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
}
// NewResourceGroup create resource group.
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup {
rg := &ResourceGroup{
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
}
return rg
}
// NewResourceGroupFromMeta create resource group from meta.
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
// Backward compatibility, recover the config from capacity.
if meta.Config == nil {
// If meta.Config is nil, which means the meta is from old version.
// DefaultResourceGroup has special configuration.
if meta.Name == DefaultResourceGroupName {
meta.Config = newResourceGroupConfig(0, meta.Capacity)
} else {
meta.Config = newResourceGroupConfig(meta.Capacity, meta.Capacity)
}
}
rg := NewResourceGroup(meta.Name, meta.Config)
for _, node := range meta.GetNodes() {
rg.nodes.Insert(node)
}
return rg
}
// GetName return resource group name.
func (rg *ResourceGroup) GetName() string {
return rg.name
}
// go:deprecated GetCapacity return resource group capacity.
func (rg *ResourceGroup) GetCapacity() int {
// Forward compatibility, recover the capacity from configuration.
capacity := rg.cfg.Requests.NodeNum
if rg.GetName() == DefaultResourceGroupName {
// Default resource group's capacity is always DefaultResourceGroupCapacity.
capacity = defaultResourceGroupCapacity
}
return int(capacity)
}
// GetConfig return resource group config.
// Do not change the config directly, use UpdateTxn to update config.
func (rg *ResourceGroup) GetConfig() *rgpb.ResourceGroupConfig {
return rg.cfg
}
// GetConfigCloned return a cloned resource group config.
func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig {
return proto.Clone(rg.cfg).(*rgpb.ResourceGroupConfig)
}
// GetNodes return nodes of resource group.
func (rg *ResourceGroup) GetNodes() []int64 {
return rg.nodes.Collect()
}
// NodeNum return node count of resource group.
func (rg *ResourceGroup) NodeNum() int {
return rg.nodes.Len()
}
// ContainNode return whether resource group contain node.
func (rg *ResourceGroup) ContainNode(id int64) bool {
return rg.nodes.Contain(id)
}
// OversizedNumOfNodes return oversized nodes count. `len(node) - requests`
func (rg *ResourceGroup) OversizedNumOfNodes() int {
oversized := rg.nodes.Len() - int(rg.cfg.Requests.NodeNum)
if oversized < 0 {
return 0
}
return oversized
}
// MissingNumOfNodes return lack nodes count. `requests - len(node)`
func (rg *ResourceGroup) MissingNumOfNodes() int {
missing := int(rg.cfg.Requests.NodeNum) - len(rg.nodes)
if missing < 0 {
return 0
}
return missing
}
// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`
func (rg *ResourceGroup) ReachLimitNumOfNodes() int {
reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes)
if reachLimit < 0 {
return 0
}
return reachLimit
}
// RedundantOfNodes return redundant nodes count. `len(node) - limits`
func (rg *ResourceGroup) RedundantNumOfNodes() int {
redundant := len(rg.nodes) - int(rg.cfg.Limits.NodeNum)
if redundant < 0 {
return 0
}
return redundant
}
// HasFrom return whether given resource group is in `from` of rg.
func (rg *ResourceGroup) HasFrom(rgName string) bool {
for _, from := range rg.cfg.GetTransferFrom() {
if from.ResourceGroup == rgName {
return true
}
}
return false
}
// HasTo return whether given resource group is in `to` of rg.
func (rg *ResourceGroup) HasTo(rgName string) bool {
for _, to := range rg.cfg.GetTransferTo() {
if to.ResourceGroup == rgName {
return true
}
}
return false
}
// GetMeta return resource group meta.
func (rg *ResourceGroup) GetMeta() *querypb.ResourceGroup {
capacity := rg.GetCapacity()
return &querypb.ResourceGroup{
Name: rg.name,
Capacity: int32(capacity),
Nodes: rg.nodes.Collect(),
Config: rg.GetConfigCloned(),
}
}
// Snapshot return a snapshot of resource group.
func (rg *ResourceGroup) Snapshot() *ResourceGroup {
return &ResourceGroup{
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
}
}
// MeetRequirement return whether resource group meet requirement.
// Return error with reason if not meet requirement.
func (rg *ResourceGroup) MeetRequirement() error {
// if len(node) is less than requests, new node need to be assigned.
if rg.nodes.Len() < int(rg.cfg.Requests.NodeNum) {
return errors.Errorf(
"has %d nodes, less than request %d",
rg.nodes.Len(),
rg.cfg.Requests.NodeNum,
)
}
// if len(node) is greater than limits, node need to be removed.
if rg.nodes.Len() > int(rg.cfg.Limits.NodeNum) {
return errors.Errorf(
"has %d nodes, greater than limit %d",
rg.nodes.Len(),
rg.cfg.Requests.NodeNum,
)
}
return nil
}
// CopyForWrite return a mutable resource group.
func (rg *ResourceGroup) CopyForWrite() *mutableResourceGroup {
return &mutableResourceGroup{ResourceGroup: rg.Snapshot()}
}
// mutableResourceGroup is a mutable type (COW) for manipulating resource group meta info for replica manager.
type mutableResourceGroup struct {
*ResourceGroup
}
// UpdateConfig update resource group config.
func (r *mutableResourceGroup) UpdateConfig(cfg *rgpb.ResourceGroupConfig) {
r.cfg = cfg
}
// Assign node to resource group.
func (r *mutableResourceGroup) AssignNode(id int64) {
r.nodes.Insert(id)
}
// Unassign node from resource group.
func (r *mutableResourceGroup) UnassignNode(id int64) {
r.nodes.Remove(id)
}
// ToResourceGroup return updated resource group, After calling this method, the mutable resource group should not be used again.
func (r *mutableResourceGroup) ToResourceGroup() *ResourceGroup {
rg := r.ResourceGroup
r.ResourceGroup = nil
return rg
}

View File

@ -0,0 +1,334 @@
package meta
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func TestResourceGroup(t *testing.T) {
cfg := &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 2,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg2",
}},
TransferTo: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg3",
}},
}
rg := NewResourceGroup("rg1", cfg)
cfg2 := rg.GetConfig()
assert.Equal(t, cfg.Requests.NodeNum, cfg2.Requests.NodeNum)
assertion := func() {
assert.Equal(t, "rg1", rg.GetName())
assert.Empty(t, rg.GetNodes())
assert.Zero(t, rg.NodeNum())
assert.Zero(t, rg.OversizedNumOfNodes())
assert.Zero(t, rg.RedundantNumOfNodes())
assert.Equal(t, 1, rg.MissingNumOfNodes())
assert.Equal(t, 2, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg2"))
assert.False(t, rg.HasFrom("rg3"))
assert.True(t, rg.HasTo("rg3"))
assert.False(t, rg.HasTo("rg2"))
assert.False(t, rg.ContainNode(1))
assert.Error(t, rg.MeetRequirement())
}
assertion()
// Test Txn
mrg := rg.CopyForWrite()
cfg = &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 2,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg3",
}},
TransferTo: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg2",
}},
}
mrg.UpdateConfig(cfg)
// nothing happens before commit.
assertion()
rg = mrg.ToResourceGroup()
assertion = func() {
assert.Equal(t, "rg1", rg.GetName())
assert.Empty(t, rg.GetNodes())
assert.Zero(t, rg.NodeNum())
assert.Zero(t, rg.OversizedNumOfNodes())
assert.Zero(t, rg.RedundantNumOfNodes())
assert.Equal(t, 2, rg.MissingNumOfNodes())
assert.Equal(t, 3, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.False(t, rg.ContainNode(1))
assert.Error(t, rg.MeetRequirement())
}
assertion()
// Test AddNode
mrg = rg.CopyForWrite()
mrg.AssignNode(1)
mrg.AssignNode(1)
assertion()
rg = mrg.ToResourceGroup()
assertion = func() {
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1}, rg.GetNodes())
assert.Equal(t, 1, rg.NodeNum())
assert.Zero(t, rg.OversizedNumOfNodes())
assert.Zero(t, rg.RedundantNumOfNodes())
assert.Equal(t, 1, rg.MissingNumOfNodes())
assert.Equal(t, 2, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.Error(t, rg.MeetRequirement())
}
assertion()
// Test AddNode until meet requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(2)
assertion()
rg = mrg.ToResourceGroup()
assertion = func() {
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
assert.Zero(t, rg.OversizedNumOfNodes())
assert.Zero(t, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 1, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.NoError(t, rg.MeetRequirement())
}
assertion()
// Test AddNode until exceed requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(3)
mrg.AssignNode(4)
assertion()
rg = mrg.ToResourceGroup()
assertion = func() {
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2, 3, 4}, rg.GetNodes())
assert.Equal(t, 4, rg.NodeNum())
assert.Equal(t, 2, rg.OversizedNumOfNodes())
assert.Equal(t, 1, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 0, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.True(t, rg.ContainNode(3))
assert.True(t, rg.ContainNode(4))
assert.Error(t, rg.MeetRequirement())
}
assertion()
// Test UnassignNode.
mrg = rg.CopyForWrite()
mrg.UnassignNode(3)
assertion()
rg = mrg.ToResourceGroup()
rgMeta := rg.GetMeta()
assert.Equal(t, 3, len(rgMeta.Nodes))
assert.Equal(t, "rg1", rgMeta.Name)
assert.Equal(t, "rg3", rgMeta.Config.TransferFrom[0].ResourceGroup)
assert.Equal(t, "rg2", rgMeta.Config.TransferTo[0].ResourceGroup)
assert.Equal(t, int32(2), rgMeta.Config.Requests.NodeNum)
assert.Equal(t, int32(3), rgMeta.Config.Limits.NodeNum)
assertion2 := func(rg *ResourceGroup) {
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2, 4}, rg.GetNodes())
assert.Equal(t, 3, rg.NodeNum())
assert.Equal(t, 1, rg.OversizedNumOfNodes())
assert.Equal(t, 0, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 0, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.False(t, rg.ContainNode(3))
assert.True(t, rg.ContainNode(4))
assert.NoError(t, rg.MeetRequirement())
}
assertion2(rg)
// snapshot do not change the original resource group.
snapshot := rg.Snapshot()
assertion2(snapshot)
snapshot.cfg = nil
snapshot.name = "rg2"
snapshot.nodes = nil
assertion2(rg)
}
func TestResourceGroupMeta(t *testing.T) {
rgMeta := &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
Nodes: []int64{1, 2},
}
rg := NewResourceGroupFromMeta(rgMeta)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
assert.Equal(t, 1, rg.OversizedNumOfNodes())
assert.Equal(t, 1, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 0, rg.ReachLimitNumOfNodes())
assert.False(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.False(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.False(t, rg.ContainNode(3))
assert.False(t, rg.ContainNode(4))
assert.Error(t, rg.MeetRequirement())
rgMeta = &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
Nodes: []int64{1, 2, 4},
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 2,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg3",
}},
TransferTo: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg2",
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2, 4}, rg.GetNodes())
assert.Equal(t, 3, rg.NodeNum())
assert.Equal(t, 1, rg.OversizedNumOfNodes())
assert.Equal(t, 0, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 0, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.False(t, rg.ContainNode(3))
assert.True(t, rg.ContainNode(4))
assert.NoError(t, rg.MeetRequirement())
newMeta := rg.GetMeta()
assert.Equal(t, int32(2), newMeta.Capacity)
// Recover Default Resource Group.
rgMeta = &querypb.ResourceGroup{
Name: DefaultResourceGroupName,
Capacity: defaultResourceGroupCapacity,
Nodes: []int64{1, 2},
}
rg = NewResourceGroupFromMeta(rgMeta)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
assert.Equal(t, 2, rg.OversizedNumOfNodes())
assert.Equal(t, 0, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, int(defaultResourceGroupCapacity-2), rg.ReachLimitNumOfNodes())
assert.False(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.False(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.False(t, rg.ContainNode(3))
assert.False(t, rg.ContainNode(4))
assert.NoError(t, rg.MeetRequirement())
newMeta = rg.GetMeta()
assert.Equal(t, defaultResourceGroupCapacity, newMeta.Capacity)
// Recover Default Resource Group.
rgMeta = &querypb.ResourceGroup{
Name: DefaultResourceGroupName,
Nodes: []int64{1, 2},
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 2,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg3",
}},
TransferTo: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg2",
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
assert.Equal(t, 0, rg.OversizedNumOfNodes())
assert.Equal(t, 0, rg.RedundantNumOfNodes())
assert.Equal(t, 0, rg.MissingNumOfNodes())
assert.Equal(t, 1, rg.ReachLimitNumOfNodes())
assert.True(t, rg.HasFrom("rg3"))
assert.False(t, rg.HasFrom("rg2"))
assert.True(t, rg.HasTo("rg2"))
assert.False(t, rg.HasTo("rg3"))
assert.True(t, rg.ContainNode(1))
assert.True(t, rg.ContainNode(2))
assert.False(t, rg.ContainNode(3))
assert.False(t, rg.ContainNode(4))
assert.NoError(t, rg.MeetRequirement())
newMeta = rg.GetMeta()
assert.Equal(t, int32(1000000), newMeta.Capacity)
}

File diff suppressed because it is too large Load Diff

View File

@ -18,21 +18,17 @@ package meta
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ResourceManagerSuite struct {
@ -47,7 +43,7 @@ func (suite *ResourceManagerSuite) SetupSuite() {
}
func (suite *ResourceManagerSuite) SetupTest() {
config := GenerateEtcdConfig()
config := params.GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
@ -63,16 +59,111 @@ func (suite *ResourceManagerSuite) SetupTest() {
suite.manager = NewResourceManager(store, session.NewNodeManager())
}
func (suite *ResourceManagerSuite) TearDownSuite() {
suite.kv.Close()
}
func TestResourceManager(t *testing.T) {
suite.Run(t, new(ResourceManagerSuite))
}
func (suite *ResourceManagerSuite) TestValidateConfiguration() {
err := suite.manager.validateResourceGroupConfig("rg1", newResourceGroupConfig(0, 0))
suite.NoError(err)
err = suite.manager.validateResourceGroupConfig("rg1", &rgpb.ResourceGroupConfig{})
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
err = suite.manager.validateResourceGroupConfig("rg1", newResourceGroupConfig(-1, 2))
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
err = suite.manager.validateResourceGroupConfig("rg1", newResourceGroupConfig(2, -1))
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
err = suite.manager.validateResourceGroupConfig("rg1", newResourceGroupConfig(3, 2))
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
cfg := newResourceGroupConfig(0, 0)
cfg.TransferFrom = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg1"}}
err = suite.manager.validateResourceGroupConfig("rg1", cfg)
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
cfg = newResourceGroupConfig(0, 0)
cfg.TransferFrom = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg2"}}
err = suite.manager.validateResourceGroupConfig("rg1", cfg)
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
cfg = newResourceGroupConfig(0, 0)
cfg.TransferTo = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg1"}}
err = suite.manager.validateResourceGroupConfig("rg1", cfg)
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
cfg = newResourceGroupConfig(0, 0)
cfg.TransferTo = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg2"}}
err = suite.manager.validateResourceGroupConfig("rg1", cfg)
suite.ErrorIs(err, merr.ErrResourceGroupIllegalConfig)
err = suite.manager.AddResourceGroup("rg2", newResourceGroupConfig(0, 0))
suite.NoError(err)
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
}
func (suite *ResourceManagerSuite) TestValidateDelete() {
// Non empty resource group can not be removed.
err := suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(1, 1))
suite.NoError(err)
err = suite.manager.validateResourceGroupIsDeletable(DefaultResourceGroupName)
suite.ErrorIs(err, merr.ErrParameterInvalid)
err = suite.manager.validateResourceGroupIsDeletable("rg1")
suite.ErrorIs(err, merr.ErrParameterInvalid)
cfg := newResourceGroupConfig(0, 0)
cfg.TransferFrom = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg1"}}
suite.manager.AddResourceGroup("rg2", cfg)
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(0, 0),
})
err = suite.manager.validateResourceGroupIsDeletable("rg1")
suite.ErrorIs(err, merr.ErrParameterInvalid)
cfg = newResourceGroupConfig(0, 0)
cfg.TransferTo = []*rgpb.ResourceGroupTransfer{{ResourceGroup: "rg1"}}
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": cfg,
})
err = suite.manager.validateResourceGroupIsDeletable("rg1")
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": newResourceGroupConfig(0, 0),
})
err = suite.manager.validateResourceGroupIsDeletable("rg1")
suite.NoError(err)
err = suite.manager.RemoveResourceGroup("rg1")
suite.NoError(err)
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
}
func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
// test add rg
err := suite.manager.AddResourceGroup("rg1")
err := suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(0, 0))
suite.NoError(err)
suite.True(suite.manager.ContainResourceGroup("rg1"))
suite.Len(suite.manager.ListResourceGroups(), 2)
// test add duplicate rg
err = suite.manager.AddResourceGroup("rg1")
// test add duplicate rg but same configuration is ok
err = suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(0, 0))
suite.NoError(err)
err = suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(1, 1))
suite.Error(err)
// test delete rg
err = suite.manager.RemoveResourceGroup("rg1")
suite.NoError(err)
@ -82,461 +173,305 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
suite.NoError(err)
// test delete default rg
err = suite.manager.RemoveResourceGroup(DefaultResourceGroupName)
suite.ErrorIs(ErrDeleteDefaultRG, err)
}
suite.ErrorIs(err, merr.ErrParameterInvalid)
func (suite *ResourceManagerSuite) TestManipulateNode() {
// test delete a rg not empty.
err = suite.manager.AddResourceGroup("rg2", newResourceGroupConfig(1, 1))
suite.NoError(err)
err = suite.manager.RemoveResourceGroup("rg2")
suite.ErrorIs(err, merr.ErrParameterInvalid)
// test delete a rg after update
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": newResourceGroupConfig(0, 0),
})
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
// assign a node to rg.
err = suite.manager.AddResourceGroup("rg2", newResourceGroupConfig(1, 1))
suite.NoError(err)
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
defer suite.manager.nodeMgr.Remove(1)
suite.manager.HandleNodeUp(1)
err = suite.manager.RemoveResourceGroup("rg2")
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": newResourceGroupConfig(0, 0),
})
// RemoveResourceGroup will remove all nodes from the resource group.
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
}
func (suite *ResourceManagerSuite) TestNodeUpAndDown() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(1, 1))
suite.NoError(err)
// test add node to rg
err = suite.manager.AssignNode("rg1", 1)
suite.NoError(err)
suite.manager.HandleNodeUp(1)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
// test add non-exist node to rg
err = suite.manager.AssignNode("rg1", 2)
suite.ErrorIs(err, merr.ErrNodeNotFound)
// test add node to non-exist rg
err = suite.manager.AssignNode("rg2", 1)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
// test remove node from rg
err = suite.manager.UnassignNode("rg1", 1)
err = suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(2, 3),
})
suite.NoError(err)
suite.manager.HandleNodeUp(2)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test remove non-exist node from rg
err = suite.manager.UnassignNode("rg1", 2)
// teardown a non-exist node from rg.
suite.manager.HandleNodeDown(2)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test add exist node to rg
suite.manager.HandleNodeUp(1)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// teardown a exist node from rg.
suite.manager.HandleNodeDown(1)
suite.Zero(suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// teardown a exist node from rg.
suite.manager.HandleNodeDown(1)
suite.Zero(suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.HandleNodeUp(1)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
err = suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(4, 4),
})
suite.NoError(err)
// test remove node from non-exist rg
err = suite.manager.UnassignNode("rg2", 1)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
// add node which already assign to rg to another rg
err = suite.manager.AddResourceGroup("rg2")
suite.manager.AddResourceGroup("rg2", newResourceGroupConfig(1, 1))
suite.NoError(err)
err = suite.manager.AssignNode("rg1", 1)
suite.NoError(err)
err = suite.manager.AssignNode("rg2", 1)
suite.ErrorIs(err, ErrNodeAlreadyAssign)
// transfer node between rgs
_, err = suite.manager.TransferNode("rg1", "rg2", 1)
suite.NoError(err)
// transfer meet non exist rg
_, err = suite.manager.TransferNode("rgggg", "rg2", 1)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
_, err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 11,
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 12,
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 13,
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 14,
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.AssignNode("rg1", 11)
suite.manager.AssignNode("rg1", 12)
suite.manager.AssignNode("rg1", 13)
suite.manager.AssignNode("rg1", 14)
suite.manager.HandleNodeUp(11)
suite.manager.HandleNodeUp(12)
suite.manager.HandleNodeUp(13)
suite.manager.HandleNodeUp(14)
rg1, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
rg2, err := suite.manager.GetResourceGroup("rg2")
suite.NoError(err)
suite.Equal(rg1.GetCapacity(), 4)
suite.Equal(rg2.GetCapacity(), 1)
suite.manager.TransferNode("rg1", "rg2", 3)
suite.Equal(rg1.GetCapacity(), 1)
suite.Equal(rg2.GetCapacity(), 4)
}
suite.Equal(4, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(1, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
func (suite *ResourceManagerSuite) TestHandleNodeUp() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 100,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 101,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
suite.manager.AssignNode("rg1", 1)
suite.manager.AssignNode("rg1", 2)
suite.manager.AssignNode("rg1", 3)
// test query node id not change, expect assign back to origin rg
rg, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 3)
suite.manager.HandleNodeUp(1)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 3)
suite.manager.HandleNodeDown(2)
rg, err = suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 2)
suite.NoError(err)
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
suite.manager.HandleNodeUp(101)
rg, err = suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 2)
suite.False(suite.manager.ContainsNode("rg1", 101))
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
}
func (suite *ResourceManagerSuite) TestRecover() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
err = suite.manager.AddResourceGroup("rg2")
suite.NoError(err)
suite.manager.AssignNode(DefaultResourceGroupName, 1)
suite.manager.TransferNode(DefaultResourceGroupName, "rg1", 1)
suite.manager.AssignNode(DefaultResourceGroupName, 2)
suite.manager.TransferNode(DefaultResourceGroupName, "rg2", 1)
suite.manager.AssignNode(DefaultResourceGroupName, 3)
suite.manager.AssignNode(DefaultResourceGroupName, 4)
suite.manager.HandleNodeDown(2)
suite.manager.HandleNodeDown(3)
// clear resource manager in hack way
delete(suite.manager.groups, "rg1")
delete(suite.manager.groups, "rg2")
delete(suite.manager.groups, DefaultResourceGroupName)
suite.manager.Recover()
rg, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(1, rg.GetCapacity())
suite.True(suite.manager.ContainsNode("rg1", 1))
rg, err = suite.manager.GetResourceGroup("rg2")
suite.NoError(err)
suite.Equal(1, rg.GetCapacity())
suite.False(suite.manager.ContainsNode("rg2", 2))
rg, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(DefaultResourceGroupCapacity, rg.GetCapacity())
suite.False(suite.manager.ContainsNode(DefaultResourceGroupName, 3))
suite.True(suite.manager.ContainsNode(DefaultResourceGroupName, 4))
}
func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode("rg", 1)
suite.manager.AssignNode("rg", 2)
suite.manager.AssignNode("rg", 3)
}
func (suite *ResourceManagerSuite) TestCheckResourceGroup() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode("rg", 1)
suite.manager.AssignNode("rg", 2)
suite.manager.AssignNode("rg", 3)
suite.manager.HandleNodeDown(11)
suite.manager.HandleNodeDown(12)
suite.manager.HandleNodeDown(13)
suite.manager.HandleNodeDown(14)
suite.Equal(1, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.HandleNodeDown(1)
lackNodes := suite.manager.CheckLackOfNode("rg")
suite.Equal(lackNodes, 1)
suite.Zero(suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.nodeMgr.Remove(2)
suite.manager.checkRGNodeStatus("rg")
lackNodes = suite.manager.CheckLackOfNode("rg")
suite.Equal(lackNodes, 2)
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(20, 30),
"rg2": newResourceGroupConfig(30, 40),
})
for i := 1; i <= 100; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.HandleNodeUp(int64(i))
}
rg, err := suite.manager.FindResourceGroupByNode(3)
suite.NoError(err)
suite.Equal(rg, "rg")
}
suite.Equal(20, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(30, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(50, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
func (suite *ResourceManagerSuite) TestGetOutboundNode() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.AddResourceGroup("rg")
suite.manager.AddResourceGroup("rg1")
suite.manager.AssignNode("rg", 1)
suite.manager.AssignNode("rg", 2)
suite.manager.AssignNode("rg1", 3)
// down all nodes
for i := 1; i <= 100; i++ {
suite.manager.HandleNodeDown(int64(i))
suite.Equal(100-i, suite.manager.GetResourceGroup("rg1").NodeNum()+
suite.manager.GetResourceGroup("rg2").NodeNum()+
suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
}
replica := NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 100,
ResourceGroup: "rg",
Nodes: []int64{1, 2},
RoNodes: []int64{3},
},
typeutil.NewUniqueSet(1, 2),
)
// if there are all rgs reach limit, should be fall back to default rg.
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(0, 0),
"rg2": newResourceGroupConfig(0, 0),
DefaultResourceGroupName: newResourceGroupConfig(0, 0),
})
outgoingNodes := suite.manager.GetOutgoingNodeNumByReplica(replica)
suite.NotNil(outgoingNodes)
suite.Len(outgoingNodes, 1)
suite.NotNil(outgoingNodes["rg1"])
suite.Equal(outgoingNodes["rg1"], int32(1))
for i := 1; i <= 100; i++ {
suite.manager.HandleNodeUp(int64(i))
suite.Equal(i, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum())
}
}
func (suite *ResourceManagerSuite) TestAutoRecover() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode(DefaultResourceGroupName, 1)
suite.manager.AssignNode(DefaultResourceGroupName, 2)
suite.manager.AssignNode("rg", 3)
suite.manager.HandleNodeDown(3)
lackNodes := suite.manager.CheckLackOfNode("rg")
suite.Equal(lackNodes, 1)
suite.manager.AutoRecoverResourceGroup("rg")
lackNodes = suite.manager.CheckLackOfNode("rg")
suite.Equal(lackNodes, 0)
// test auto recover behavior when all node down
suite.manager.nodeMgr.Remove(1)
suite.manager.nodeMgr.Remove(2)
suite.manager.AutoRecoverResourceGroup("rg")
nodes, _ := suite.manager.GetNodes("rg")
suite.Len(nodes, 0)
nodes, _ = suite.manager.GetNodes(DefaultResourceGroupName)
suite.Len(nodes, 0)
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.HandleNodeUp(1)
suite.manager.AutoRecoverResourceGroup("rg")
nodes, _ = suite.manager.GetNodes("rg")
suite.Len(nodes, 1)
nodes, _ = suite.manager.GetNodes(DefaultResourceGroupName)
suite.Len(nodes, 0)
}
func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
for i := 0; i < 10; i++ {
for i := 1; i <= 100; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "127.0.0.1:0",
Address: "localhost",
Hostname: "localhost",
}))
suite.manager.HandleNodeUp(int64(i))
}
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 0)
suite.Equal(100, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.HandleNodeUp(1)
suite.manager.HandleNodeUp(2)
suite.manager.HandleNodeUp(3)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
// Recover 10 nodes from default resource group
suite.manager.AddResourceGroup("rg1", newResourceGroupConfig(10, 30))
suite.Zero(suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg1").MissingNumOfNodes())
suite.Equal(100, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.AutoRecoverResourceGroup("rg1")
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg1").MissingNumOfNodes())
suite.Equal(90, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// shutdown node 1 and 2
suite.manager.nodeMgr.Remove(1)
suite.manager.nodeMgr.Remove(2)
// Recover 20 nodes from default resource group
suite.manager.AddResourceGroup("rg2", newResourceGroupConfig(20, 30))
suite.Zero(suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup("rg2").MissingNumOfNodes())
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(90, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.AutoRecoverResourceGroup("rg2")
suite.Equal(20, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(70, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
defaultRG, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 1)
// Recover 5 redundant nodes from resource group
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(5, 5),
})
suite.manager.AutoRecoverResourceGroup("rg1")
suite.Equal(20, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(5, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(75, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
suite.manager.HandleNodeUp(4)
suite.manager.HandleNodeUp(5)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
// Recover 10 redundant nodes from resource group 2 to resource group 1 and default resource group.
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(10, 20),
"rg2": newResourceGroupConfig(5, 10),
})
suite.manager.HandleNodeUp(7)
suite.manager.HandleNodeUp(8)
suite.manager.HandleNodeUp(9)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 6)
}
suite.manager.AutoRecoverResourceGroup("rg2")
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(80, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
func (suite *ResourceManagerSuite) TestStoreFailed() {
store := mocks.NewQueryCoordCatalog(suite.T())
nodeMgr := session.NewNodeManager()
manager := NewResourceManager(store, nodeMgr)
// recover redundant nodes from default resource group
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": newResourceGroupConfig(10, 20),
"rg2": newResourceGroupConfig(20, 30),
DefaultResourceGroupName: newResourceGroupConfig(10, 20),
})
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
storeErr := errors.New("store error")
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr)
store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr)
// Even though the default resource group has 20 nodes limits,
// all redundant nodes will be assign to default resource group.
suite.Equal(20, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(30, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(50, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
err := manager.AddResourceGroup("rg")
suite.ErrorIs(err, storeErr)
// Test recover missing from high priority resource group by set `from`.
suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 15,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 15,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg1",
}},
})
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
DefaultResourceGroupName: newResourceGroupConfig(30, 40),
})
manager.groups["rg"] = &ResourceGroup{
nodes: typeutil.NewConcurrentSet[int64](),
capacity: 0,
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
suite.manager.AutoRecoverResourceGroup("rg3")
// Get 10 from default group for redundant nodes, get 5 from rg1 for rg3 at high priority.
suite.Equal(15, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(30, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(15, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// Test recover redundant to high priority resource group by set `to`.
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg3": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
TransferTo: []*rgpb.ResourceGroupTransfer{{
ResourceGroup: "rg2",
}},
},
"rg1": newResourceGroupConfig(15, 100),
"rg2": newResourceGroupConfig(15, 40),
})
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
suite.manager.AutoRecoverResourceGroup("rg3")
// Recover rg3 by transfer 10 nodes to rg2 with high priority, 5 to rg1.
suite.Equal(20, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(40, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// Test down all nodes.
for i := 1; i <= 100; i++ {
suite.manager.nodeMgr.Remove(int64(i))
}
err = manager.RemoveResourceGroup("rg")
suite.ErrorIs(err, storeErr)
err = manager.AssignNode("rg", 1)
suite.ErrorIs(err, storeErr)
manager.groups["rg"].assignNode(1, 1)
err = manager.UnassignNode("rg", 1)
suite.ErrorIs(err, storeErr)
_, err = manager.TransferNode("rg", DefaultResourceGroupName, 1)
suite.ErrorIs(err, storeErr)
_, err = manager.HandleNodeUp(2)
suite.ErrorIs(err, storeErr)
_, err = manager.HandleNodeDown(1)
suite.ErrorIs(err, storeErr)
}
func (suite *ResourceManagerSuite) TearDownSuite() {
suite.kv.Close()
}
func TestResourceManager(t *testing.T) {
suite.Run(t, new(ResourceManagerSuite))
suite.manager.RemoveAllDownNode()
suite.Zero(suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Zero(suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Zero(suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
// check replica, find read only nodes and remove it from replica if all segment/channel has been moved
@ -67,20 +68,26 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) {
defer ob.wg.Done()
log.Info("Start check replica loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second))
defer ticker.Stop()
listener := ob.meta.ResourceManager.ListenNodeChanged()
for {
select {
case <-ctx.Done():
log.Info("Close replica observer")
ob.waitNodeChangedOrTimeout(ctx, listener)
// stop if the context is canceled.
if ctx.Err() != nil {
log.Info("Stop check replica observer")
return
case <-ticker.C:
ob.checkNodesInReplica()
}
// do check once.
ob.checkNodesInReplica()
}
}
func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listener *syncutil.VersionedListener) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second))
defer cancel()
listener.Wait(ctxWithTimeout)
}
func (ob *ReplicaObserver) checkNodesInReplica() {
log := log.Ctx(context.Background()).WithRateGroup("qcv2.replicaObserver", 1, 60)
collections := ob.meta.GetAll()

View File

@ -21,10 +21,10 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -82,8 +82,14 @@ func (suite *ReplicaObserverSuite) SetupTest() {
}
func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
suite.meta.ResourceManager.AddResourceGroup("rg1")
suite.meta.ResourceManager.AddResourceGroup("rg2")
suite.meta.ResourceManager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 2},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 2},
})
suite.meta.ResourceManager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 2},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 2},
})
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost:8080",
@ -104,60 +110,86 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
Address: "localhost:8080",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 3)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg2", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 4)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg2", 1)
suite.meta.ResourceManager.HandleNodeUp(1)
suite.meta.ResourceManager.HandleNodeUp(2)
suite.meta.ResourceManager.HandleNodeUp(3)
suite.meta.ResourceManager.HandleNodeUp(4)
err := suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
err := suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 2))
suite.NoError(err)
replicas := make([]*meta.Replica, 2)
replicas[0] = meta.NewReplica(
&querypb.Replica{
ID: 10000,
CollectionID: suite.collectionID,
ResourceGroup: "rg1",
Nodes: []int64{1, 2, 3},
},
typeutil.NewUniqueSet(1, 2, 3),
)
replicas[1] = meta.NewReplica(
&querypb.Replica{
ID: 10001,
CollectionID: suite.collectionID,
ResourceGroup: "rg2",
Nodes: []int64{4},
},
typeutil.NewUniqueSet(4),
)
err = suite.meta.ReplicaManager.Put(replicas...)
replicas, err := suite.meta.Spawn(suite.collectionID, map[string]int{
"rg1": 1,
"rg2": 1,
})
suite.NoError(err)
suite.distMgr.ChannelDistManager.Update(1, utils.CreateTestChannel(suite.collectionID, 1, 1, "test-insert-channel1"))
suite.distMgr.SegmentDistManager.Update(1, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 1, 1, 1, "test-insert-channel1"))
suite.distMgr.ChannelDistManager.Update(2, utils.CreateTestChannel(suite.collectionID, 2, 1, "test-insert-channel2"))
suite.distMgr.SegmentDistManager.Update(2, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 2, 2, 1, "test-insert-channel2"))
suite.distMgr.ChannelDistManager.Update(3, utils.CreateTestChannel(suite.collectionID, 3, 1, "test-insert-channel3"))
suite.distMgr.SegmentDistManager.Update(3, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 2, 3, 1, "test-insert-channel3"))
suite.Equal(2, len(replicas))
suite.Eventually(func() bool {
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return (replica0.Contains(3) || replica0.ContainRONode(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1)
availableNodes := typeutil.NewUniqueSet()
for _, r := range replicas {
replica := suite.meta.ReplicaManager.Get(r.GetID())
suite.NotNil(replica)
if replica.RWNodesCount() != 2 {
return false
}
if replica.RONodesCount() != 0 {
return false
}
availableNodes.Insert(replica.GetNodes()...)
}
return availableNodes.Len() == 4
}, 6*time.Second, 2*time.Second)
suite.distMgr.ChannelDistManager.Update(3)
suite.distMgr.SegmentDistManager.Update(3)
// Add some segment on nodes.
for i := 0; i < 4; i++ {
suite.distMgr.ChannelDistManager.Update(
int64(i),
utils.CreateTestChannel(suite.collectionID, 1, 1, "test-insert-channel1"))
suite.distMgr.SegmentDistManager.Update(
int64(i),
utils.CreateTestSegment(suite.collectionID, suite.partitionID, 1, 1, 1, "test-insert-channel1"))
}
// Do a replica transfer.
suite.meta.ReplicaManager.TransferReplica(suite.collectionID, "rg1", "rg2", 1)
// All replica should in the rg2 but not rg1
// And some nodes will become ro nodes before all segment and channel on it is cleaned.
suite.Eventually(func() bool {
for _, r := range replicas {
replica := suite.meta.ReplicaManager.Get(r.GetID())
suite.NotNil(replica)
suite.Equal("rg2", replica.GetResourceGroup())
// all replica should have ro nodes.
// transferred replica should have 2 ro nodes.
// not transferred replica should have 1 ro nodes for balancing.
if !(replica.RONodesCount()+replica.RWNodesCount() == 2 && replica.RONodesCount() > 0) {
return false
}
}
return true
}, 30*time.Second, 2*time.Second)
// Add some segment on nodes.
for i := 0; i < 4; i++ {
suite.distMgr.ChannelDistManager.Update(int64(i))
suite.distMgr.SegmentDistManager.Update(int64(i))
}
suite.Eventually(func() bool {
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return (!replica0.Contains(3) && !replica0.ContainRONode(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2)
}, 6*time.Second, 2*time.Second)
for _, r := range replicas {
replica := suite.meta.ReplicaManager.Get(r.GetID())
suite.NotNil(replica)
suite.Equal("rg2", replica.GetResourceGroup())
if replica.RONodesCount() > 0 {
return false
}
if replica.RWNodesCount() != 1 {
return false
}
}
return true
}, 30*time.Second, 2*time.Second)
}
func (suite *ReplicaObserverSuite) TearDownSuite() {

View File

@ -27,9 +27,11 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
// check whether rg lack of node, try to transfer node from default rg
// ResourceObserver is used to observe resource group status.
// Recover resource group into expected configuration.
type ResourceObserver struct {
cancel context.CancelFunc
wg sync.WaitGroup
@ -65,43 +67,56 @@ func (ob *ResourceObserver) schedule(ctx context.Context) {
defer ob.wg.Done()
log.Info("Start check resource group loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckResourceGroupInterval.GetAsDuration(time.Second))
defer ticker.Stop()
listener := ob.meta.ResourceManager.ListenResourceGroupChanged()
for {
select {
case <-ctx.Done():
ob.waitRGChangedOrTimeout(ctx, listener)
// stop if the context is canceled.
if ctx.Err() != nil {
log.Info("Close resource group observer")
return
case <-ticker.C:
ob.checkResourceGroup()
}
// do check once.
ob.checkAndRecoverResourceGroup()
}
}
func (ob *ResourceObserver) checkResourceGroup() {
func (ob *ResourceObserver) waitRGChangedOrTimeout(ctx context.Context, listener *syncutil.VersionedListener) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, params.Params.QueryCoordCfg.CheckResourceGroupInterval.GetAsDuration(time.Second))
defer cancel()
listener.Wait(ctxWithTimeout)
}
func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
manager := ob.meta.ResourceManager
rgNames := manager.ListResourceGroups()
enableRGAutoRecover := params.Params.QueryCoordCfg.EnableRGAutoRecover.GetAsBool()
log.Info("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
// Check if there is any incoming node.
if manager.CheckIncomingNodeNum() > 0 {
log.Info("new incoming node is ready to be assigned...", zap.Int("incomingNodeNum", manager.CheckIncomingNodeNum()))
manager.AssignPendingIncomingNode()
}
// Remove all down nodes in resource group manager.
log.Info("remove all down nodes in resource group manager...")
ob.meta.RemoveAllDownNode()
log.Info("recover resource groups...")
// Recover all resource group into expected configuration.
for _, rgName := range rgNames {
if rgName == meta.DefaultResourceGroupName {
continue
}
lackNodeNum := manager.CheckLackOfNode(rgName)
if lackNodeNum > 0 {
log.Info("found resource group lack of nodes",
if err := manager.MeetRequirement(rgName); err != nil {
log.Info("found resource group need to be recovered",
zap.String("rgName", rgName),
zap.Int("lackNodeNum", lackNodeNum),
zap.String("reason", err.Error()),
)
if enableRGAutoRecover {
nodes, err := manager.AutoRecoverResourceGroup(rgName)
err := manager.AutoRecoverResourceGroup(rgName)
if err != nil {
log.Warn("failed to recover resource group",
zap.String("rgName", rgName),
zap.Int("lackNodeNum", lackNodeNum-len(nodes)),
zap.Error(err),
)
}
@ -111,4 +126,5 @@ func (ob *ResourceObserver) checkResourceGroup() {
if enableRGAutoRecover {
utils.RecoverAllCollection(ob.meta)
}
log.Info("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
}

View File

@ -17,12 +17,12 @@ package observers
import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/mocks"
@ -79,16 +79,22 @@ func (suite *ResourceObserverSuite) SetupTest() {
suite.observer.Start()
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
suite.store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
for i := 0; i < 10; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i))
suite.meta.ResourceManager.HandleNodeUp(int64(i))
}
}
func (suite *ResourceObserverSuite) TearDownTest() {
suite.observer.Stop()
suite.store.ExpectedCalls = nil
}
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
@ -113,7 +119,10 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
},
typeutil.NewUniqueSet(),
))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
})
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(100),
Address: "localhost",
@ -134,49 +143,37 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)
suite.meta.ResourceManager.AssignNode("rg", 103)
suite.meta.ResourceManager.HandleNodeUp(100)
suite.meta.ResourceManager.HandleNodeUp(101)
suite.meta.ResourceManager.HandleNodeUp(102)
suite.meta.ResourceManager.HandleNodeUp(103)
suite.meta.ResourceManager.HandleNodeDown(100)
suite.meta.ResourceManager.HandleNodeDown(101)
// before auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 2 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
// after auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 0 && len(nodesInReplica) == 2
}, 5*time.Second, 1*time.Second)
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg"))
}
func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 50},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 50},
})
for i := 100; i < 200; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", int64(i))
suite.meta.ResourceManager.HandleNodeUp(int64(i))
suite.meta.ResourceManager.HandleNodeDown(int64(i))
}
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
return lackNodesNum == 90
}, 5*time.Second, 1*time.Second)
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg"))
}
func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Times(2)
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
@ -199,8 +196,11 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
typeutil.NewUniqueSet(),
))
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(errors.New("store error"))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
})
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(100),
Address: "localhost",
@ -221,26 +221,14 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)
suite.meta.ResourceManager.AssignNode("rg", 103)
suite.meta.ResourceManager.HandleNodeUp(100)
suite.meta.ResourceManager.HandleNodeUp(101)
suite.meta.ResourceManager.HandleNodeUp(102)
suite.meta.ResourceManager.HandleNodeUp(103)
suite.meta.ResourceManager.HandleNodeDown(100)
suite.meta.ResourceManager.HandleNodeDown(101)
// before auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 2 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
// after auto recover rg
suite.Eventually(func() bool {
lackNodesNum := suite.meta.ResourceManager.CheckLackOfNode("rg")
nodesInReplica := suite.meta.ReplicaManager.Get(2).GetNodes()
return lackNodesNum == 0 && len(nodesInReplica) == 0
}, 5*time.Second, 1*time.Second)
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg"))
}
func (suite *ResourceObserverSuite) TearDownSuite() {

View File

@ -589,7 +589,7 @@ func (suite *OpsServiceSuite) TestTransferSegment() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, node)
suite.meta.ResourceManager.HandleNodeUp(node)
}
// test transfer segment success, expect generate 1 balance segment task
@ -807,7 +807,7 @@ func (suite *OpsServiceSuite) TestTransferChannel() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, node)
suite.meta.ResourceManager.HandleNodeUp(node)
}
// test transfer channel success, expect generate 1 balance channel task

View File

@ -50,7 +50,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
@ -712,24 +711,10 @@ func (s *Server) tryHandleNodeUp() {
}
func (s *Server) handleNodeUp(node int64) {
log := log.With(zap.Int64("nodeID", node))
s.taskScheduler.AddExecutor(node)
s.distController.StartDistInstance(s.ctx, node)
// need assign to new rg and replica
rgName, err := s.meta.ResourceManager.HandleNodeUp(node)
if err != nil {
log.Warn("HandleNodeUp: failed to assign node to resource group",
zap.Error(err),
)
return
}
log.Info("HandleNodeUp: assign node to resource group",
zap.String("resourceGroup", rgName),
)
utils.RecoverAllCollection(s.meta)
s.meta.ResourceManager.HandleNodeUp(node)
}
func (s *Server) handleNodeDown(node int64) {
@ -763,18 +748,7 @@ func (s *Server) handleNodeDown(node int64) {
// Clear tasks
s.taskScheduler.RemoveByNode(node)
rgName, err := s.meta.ResourceManager.HandleNodeDown(node)
if err != nil {
log.Warn("HandleNodeDown: failed to remove node from resource group",
zap.String("resourceGroup", rgName),
zap.Error(err),
)
return
}
log.Info("HandleNodeDown: remove node from resource group",
zap.String("resourceGroup", rgName),
)
s.meta.ResourceManager.HandleNodeDown(node)
}
// checkReplicas checks whether replica contains offline node, and remove those nodes

View File

@ -143,7 +143,7 @@ func (suite *ServerSuite) SetupTest() {
suite.Require().NoError(err)
ok := suite.waitNodeUp(suite.nodes[i], 5*time.Second)
suite.Require().True(ok)
suite.server.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, suite.nodes[i].ID)
suite.server.meta.ResourceManager.HandleNodeUp(suite.nodes[i].ID)
suite.expectLoadAndReleasePartitions(suite.nodes[i])
}

View File

@ -1022,7 +1022,7 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
return merr.Status(err), nil
}
err := s.meta.ResourceManager.AddResourceGroup(req.GetResourceGroup())
err := s.meta.ResourceManager.AddResourceGroup(req.GetResourceGroup(), req.GetConfig())
if err != nil {
log.Warn("failed to create resource group", zap.Error(err))
return merr.Status(err), nil
@ -1030,6 +1030,25 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
return merr.Success(), nil
}
func (s *Server) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.Any("rgName", req.GetResourceGroups()),
)
log.Info("update resource group request received")
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn("failed to update resource group", zap.Error(err))
return merr.Status(err), nil
}
err := s.meta.ResourceManager.UpdateResourceGroups(req.GetResourceGroups())
if err != nil {
log.Warn("failed to update resource group", zap.Error(err))
return merr.Status(err), nil
}
return merr.Success(), nil
}
func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("rgName", req.GetResourceGroup()),
@ -1056,6 +1075,7 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
return merr.Success(), nil
}
// go:deprecated TransferNode transfer nodes between resource groups.
func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("source", req.GetSourceResourceGroup()),
@ -1085,8 +1105,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
}
// Move node from source resource group to target resource group.
_, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
if err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())); err != nil {
log.Warn("failed to transfer node", zap.Error(err))
return merr.Status(err), nil
}
@ -1164,8 +1183,9 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ
return resp, nil
}
rg, err := s.meta.ResourceManager.GetResourceGroup(req.GetResourceGroup())
if err != nil {
rg := s.meta.ResourceManager.GetResourceGroup(req.GetResourceGroup())
if rg == nil {
err := merr.WrapErrResourceGroupNotFound(req.GetResourceGroup())
resp.Status = merr.Status(err)
return resp, nil
}
@ -1198,13 +1218,28 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ
}
}
nodes := make([]*commonpb.NodeInfo, 0, len(rg.GetNodes()))
for _, nodeID := range rg.GetNodes() {
nodeSessionInfo := s.nodeMgr.Get(nodeID)
// Filter offline nodes and nodes in stopping state
if nodeSessionInfo != nil && !nodeSessionInfo.IsStoppingState() {
nodes = append(nodes, &commonpb.NodeInfo{
NodeId: nodeSessionInfo.ID(),
Address: nodeSessionInfo.Addr(),
Hostname: nodeSessionInfo.Hostname(),
})
}
}
resp.ResourceGroup = &querypb.ResourceGroupInfo{
Name: req.GetResourceGroup(),
Capacity: int32(rg.GetCapacity()),
NumAvailableNode: int32(len(rg.GetNodes())),
NumAvailableNode: int32(len(nodes)),
NumLoadedReplica: loadedReplicas,
NumOutgoingNode: outgoingNodes,
NumIncomingNode: incomingNodes,
Config: rg.GetConfig(),
Nodes: nodes,
}
return resp, nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
@ -158,8 +159,7 @@ func (suite *ServiceSuite) SetupTest() {
Address: "localhost",
Hostname: "localhost",
}))
err := suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, node)
suite.NoError(err)
suite.meta.ResourceManager.HandleNodeUp(node)
}
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
@ -371,8 +371,19 @@ func (suite *ServiceSuite) TestResourceGroup() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
// duplicate create a same resource group with same config is ok.
resp, err = server.CreateResourceGroup(ctx, createRG)
suite.NoError(err)
suite.True(merr.Ok(resp))
resp, err = server.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: "rg1",
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 10000},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 10000},
},
})
suite.NoError(err)
suite.False(merr.Ok(resp))
listRG := &milvuspb.ListResourceGroupsRequest{}
@ -401,12 +412,18 @@ func (suite *ServiceSuite) TestResourceGroup() {
Address: "localhost",
Hostname: "localhost",
}))
server.meta.ResourceManager.AddResourceGroup("rg11")
server.meta.ResourceManager.AssignNode("rg11", 1011)
server.meta.ResourceManager.AssignNode("rg11", 1012)
server.meta.ResourceManager.AddResourceGroup("rg12")
server.meta.ResourceManager.AssignNode("rg12", 1013)
server.meta.ResourceManager.AssignNode("rg12", 1014)
server.meta.ResourceManager.AddResourceGroup("rg11", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 2},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 2},
})
server.meta.ResourceManager.HandleNodeUp(1011)
server.meta.ResourceManager.HandleNodeUp(1012)
server.meta.ResourceManager.AddResourceGroup("rg12", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 2},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 2},
})
server.meta.ResourceManager.HandleNodeUp(1013)
server.meta.ResourceManager.HandleNodeUp(1014)
server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(2, 1))
server.meta.ReplicaManager.Put(meta.NewReplica(&querypb.Replica{
@ -504,9 +521,19 @@ func (suite *ServiceSuite) TestTransferNode() {
ctx := context.Background()
server := suite.server
err := server.meta.ResourceManager.AddResourceGroup("rg1")
server.resourceObserver = observers.NewResourceObserver(server.meta)
server.resourceObserver.Start()
defer server.resourceObserver.Stop()
err := server.meta.ResourceManager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 0},
})
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg2")
err = server.meta.ResourceManager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 0},
})
suite.NoError(err)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
@ -526,6 +553,8 @@ func (suite *ServiceSuite) TestTransferNode() {
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(100 * time.Millisecond)
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
suite.NoError(err)
suite.Len(nodes, 1)
@ -558,9 +587,15 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
err = server.meta.ResourceManager.AddResourceGroup("rg3")
err = server.meta.ResourceManager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
})
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg4")
err = server.meta.ResourceManager.AddResourceGroup("rg4", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 0},
})
suite.NoError(err)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 11,
@ -582,10 +617,10 @@ func (suite *ServiceSuite) TestTransferNode() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg3", 11)
suite.meta.ResourceManager.AssignNode("rg3", 12)
suite.meta.ResourceManager.AssignNode("rg3", 13)
suite.meta.ResourceManager.AssignNode("rg3", 14)
suite.meta.ResourceManager.HandleNodeUp(11)
suite.meta.ResourceManager.HandleNodeUp(12)
suite.meta.ResourceManager.HandleNodeUp(13)
suite.meta.ResourceManager.HandleNodeUp(14)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
@ -594,6 +629,8 @@ func (suite *ServiceSuite) TestTransferNode() {
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(100 * time.Millisecond)
nodes, err = server.meta.ResourceManager.GetNodes("rg3")
suite.NoError(err)
suite.Len(nodes, 1)
@ -631,11 +668,20 @@ func (suite *ServiceSuite) TestTransferReplica() {
ctx := context.Background()
server := suite.server
err := server.meta.ResourceManager.AddResourceGroup("rg1")
err := server.meta.ResourceManager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 1},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 1},
})
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg2")
err = server.meta.ResourceManager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 1},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 1},
})
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg3")
err = server.meta.ResourceManager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 3},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 3},
})
suite.NoError(err)
resp, err := suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
@ -715,11 +761,11 @@ func (suite *ServiceSuite) TestTransferReplica() {
Address: "localhost",
Hostname: "localhost",
}))
suite.server.meta.AssignNode("rg1", 1001)
suite.server.meta.AssignNode("rg2", 1002)
suite.server.meta.AssignNode("rg3", 1003)
suite.server.meta.AssignNode("rg3", 1004)
suite.server.meta.AssignNode("rg3", 1005)
suite.server.meta.HandleNodeUp(1001)
suite.server.meta.HandleNodeUp(1002)
suite.server.meta.HandleNodeUp(1003)
suite.server.meta.HandleNodeUp(1004)
suite.server.meta.HandleNodeUp(1005)
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 2,
@ -1679,6 +1725,18 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
}
func (suite *ServiceSuite) TestHandleNodeUp() {
suite.server.replicaObserver = observers.NewReplicaObserver(
suite.server.meta,
suite.server.dist,
)
suite.server.resourceObserver = observers.NewResourceObserver(
suite.server.meta,
)
suite.server.replicaObserver.Start()
defer suite.server.replicaObserver.Stop()
suite.server.resourceObserver.Start()
defer suite.server.resourceObserver.Stop()
server := suite.server
suite.server.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
suite.server.meta.ReplicaManager.Put(meta.NewReplica(
@ -1700,19 +1758,26 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
Hostname: "localhost",
}))
server.handleNodeUp(111)
// wait for async update by observer
time.Sleep(100 * time.Millisecond)
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
nodesInRG, _ := suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)
log.Info("handleNodeUp")
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
suite.server.meta.ResourceManager.AddResourceGroup("rg")
suite.server.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 1},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 1},
})
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 222,
Address: "localhost",
Hostname: "localhost",
}))
server.handleNodeUp(222)
// wait for async update by observer
time.Sleep(100 * time.Millisecond)
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
nodesInRG, _ = suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
@ -51,9 +52,19 @@ func TestSpawnReplicasWithRG(t *testing.T) {
store := querycoord.NewCatalog(kv)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg1")
m.ResourceManager.AddResourceGroup("rg2")
m.ResourceManager.AddResourceGroup("rg3")
m.ResourceManager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 3},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 3},
})
m.ResourceManager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 3},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 3},
})
m.ResourceManager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 3},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 3},
})
for i := 1; i < 10; i++ {
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
@ -61,13 +72,13 @@ func TestSpawnReplicasWithRG(t *testing.T) {
Hostname: "localhost",
}))
if i%3 == 0 {
m.ResourceManager.AssignNode("rg1", int64(i))
m.ResourceManager.HandleNodeUp(int64(i))
}
if i%3 == 1 {
m.ResourceManager.AssignNode("rg2", int64(i))
m.ResourceManager.HandleNodeUp(int64(i))
}
if i%3 == 2 {
m.ResourceManager.AssignNode("rg3", int64(i))
m.ResourceManager.HandleNodeUp(int64(i))
}
}
@ -130,7 +141,10 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
m.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 0},
})
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
m.ReplicaManager.Put(meta.NewReplica(
@ -194,7 +208,10 @@ func TestAddNodesToCollectionsInRG(t *testing.T) {
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
m.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
})
m.CollectionManager.PutCollection(CreateTestCollection(1, 2))
m.CollectionManager.PutCollection(CreateTestCollection(2, 2))
m.ReplicaManager.Put(meta.NewReplica(
@ -236,32 +253,15 @@ func TestAddNodesToCollectionsInRG(t *testing.T) {
},
typeutil.NewUniqueSet(),
))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
}))
_, err := m.ResourceManager.HandleNodeUp(1)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(2)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(3)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(4)
assert.NoError(t, err)
_, err = m.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg", 4)
assert.NoError(t, err)
for i := 1; i < 5; i++ {
nodeID := int64(i)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID,
Address: "127.0.0.1",
Hostname: "localhost",
}))
m.ResourceManager.HandleNodeUp(nodeID)
}
RecoverAllCollection(m)
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2)

View File

@ -110,6 +110,10 @@ func (m *GrpcQueryCoordClient) CreateResourceGroup(ctx context.Context, req *mil
return &commonpb.Status{}, m.Err
}
func (m *GrpcQueryCoordClient) UpdateResourceGroups(ctx context.Context, req *querypb.UpdateResourceGroupsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcQueryCoordClient) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}

View File

@ -122,6 +122,7 @@ var (
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeManageOwnership.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeCreateResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeUpdateResourceGroups.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDropResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDescribeResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeListResourceGroups.String()),

View File

@ -61,7 +61,13 @@ var (
ErrGeneralCapacityExceeded = newMilvusError("general capacity exceeded", 250, false)
// ResourceGroup related
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)
ErrResourceGroupAlreadyExist = newMilvusError("resource group already exist, but create with different config", 301, false)
ErrResourceGroupReachLimit = newMilvusError("resource group num reach limit", 302, false)
ErrResourceGroupIllegalConfig = newMilvusError("resource group illegal config", 303, false)
// go:deprecated
ErrResourceGroupNodeNotEnough = newMilvusError("resource group node not enough", 304, false)
ErrResourceGroupServiceAvailable = newMilvusError("resource group service available", 305, true)
// Replica related
ErrReplicaNotFound = newMilvusError("replica not found", 400, false)

View File

@ -94,6 +94,11 @@ func (s *ErrSuite) TestWrap() {
// ResourceGroup related
s.ErrorIs(WrapErrResourceGroupNotFound("test_ResourceGroup", "failed to get ResourceGroup"), ErrResourceGroupNotFound)
s.ErrorIs(WrapErrResourceGroupAlreadyExist("test_ResourceGroup", "failed to get ResourceGroup"), ErrResourceGroupAlreadyExist)
s.ErrorIs(WrapErrResourceGroupReachLimit("test_ResourceGroup", 1, "failed to get ResourceGroup"), ErrResourceGroupReachLimit)
s.ErrorIs(WrapErrResourceGroupIllegalConfig("test_ResourceGroup", nil, "failed to get ResourceGroup"), ErrResourceGroupIllegalConfig)
s.ErrorIs(WrapErrResourceGroupNodeNotEnough("test_ResourceGroup", 1, 2, "failed to get ResourceGroup"), ErrResourceGroupNodeNotEnough)
s.ErrorIs(WrapErrResourceGroupServiceAvailable("test_ResourceGroup", "failed to get ResourceGroup"), ErrResourceGroupServiceAvailable)
// Replica related
s.ErrorIs(WrapErrReplicaNotFound(1, "failed to get replica"), ErrReplicaNotFound)

View File

@ -555,6 +555,52 @@ func WrapErrResourceGroupNotFound(rg any, msg ...string) error {
return err
}
// WrapErrResourceGroupAlreadyExist wraps ErrResourceGroupNotFound with resource group
func WrapErrResourceGroupAlreadyExist(rg any, msg ...string) error {
err := wrapFields(ErrResourceGroupAlreadyExist, value("rg", rg))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// WrapErrResourceGroupReachLimit wraps ErrResourceGroupReachLimit with resource group and limit
func WrapErrResourceGroupReachLimit(rg any, limit any, msg ...string) error {
err := wrapFields(ErrResourceGroupReachLimit, value("rg", rg), value("limit", limit))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// WrapErrResourceGroupIllegalConfig wraps ErrResourceGroupIllegalConfig with resource group
func WrapErrResourceGroupIllegalConfig(rg any, cfg any, msg ...string) error {
err := wrapFields(ErrResourceGroupIllegalConfig, value("rg", rg), value("config", cfg))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// go:deprecated
// WrapErrResourceGroupNodeNotEnough wraps ErrResourceGroupNodeNotEnough with resource group
func WrapErrResourceGroupNodeNotEnough(rg any, current any, expected any, msg ...string) error {
err := wrapFields(ErrResourceGroupNodeNotEnough, value("rg", rg), value("currentNodeNum", current), value("expectedNodeNum", expected))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// WrapErrResourceGroupServiceAvailable wraps ErrResourceGroupServiceAvailable with resource group
func WrapErrResourceGroupServiceAvailable(msg ...string) error {
err := wrapFields(ErrResourceGroupServiceAvailable)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
// Replica related
func WrapErrReplicaNotFound(id int64, msg ...string) error {
err := wrapFields(ErrReplicaNotFound, value("replica", id))

View File

@ -122,12 +122,13 @@ type quotaConfig struct {
DQLMinQueryRatePerPartition ParamItem `refreshable:"true"`
// limits
MaxCollectionNum ParamItem `refreshable:"true"`
MaxCollectionNumPerDB ParamItem `refreshable:"true"`
TopKLimit ParamItem `refreshable:"true"`
NQLimit ParamItem `refreshable:"true"`
MaxQueryResultWindow ParamItem `refreshable:"true"`
MaxOutputSize ParamItem `refreshable:"true"`
MaxCollectionNum ParamItem `refreshable:"true"`
MaxCollectionNumPerDB ParamItem `refreshable:"true"`
TopKLimit ParamItem `refreshable:"true"`
NQLimit ParamItem `refreshable:"true"`
MaxQueryResultWindow ParamItem `refreshable:"true"`
MaxOutputSize ParamItem `refreshable:"true"`
MaxResourceGroupNumOfQueryNode ParamItem `refreshable:"true"`
// limit writing
ForceDenyWriting ParamItem `refreshable:"true"`
@ -1534,6 +1535,15 @@ Check https://milvus.io/docs/limitations.md for more details.`,
}
p.MaxOutputSize.Init(base.mgr)
p.MaxResourceGroupNumOfQueryNode = ParamItem{
Key: "quotaAndLimits.limits.maxResourceGroupNumOfQueryNode",
Version: "2.4.1",
Doc: `maximum number of resource groups of query nodes`,
DefaultValue: "1024", // 1024
Export: true,
}
p.MaxResourceGroupNumOfQueryNode.Init(base.mgr)
// limit writing
p.ForceDenyWriting = ParamItem{
Key: "quotaAndLimits.limitWriting.forceDeny",

View File

@ -177,6 +177,9 @@ func TestQuotaParam(t *testing.T) {
t.Run("test limits", func(t *testing.T) {
assert.Equal(t, 65536, qc.MaxCollectionNum.GetAsInt())
assert.Equal(t, 65536, qc.MaxCollectionNumPerDB.GetAsInt())
assert.Equal(t, 1024, params.QuotaConfig.MaxResourceGroupNumOfQueryNode.GetAsInt())
params.Save(params.QuotaConfig.MaxResourceGroupNumOfQueryNode.Key, "512")
assert.Equal(t, 512, params.QuotaConfig.MaxResourceGroupNumOfQueryNode.GetAsInt())
})
t.Run("test limit writing", func(t *testing.T) {

View File

@ -253,6 +253,14 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2,
return cluster, nil
}
func (cluster *MiniClusterV2) AddQueryNodes(k int) []*grpcquerynode.Server {
servers := make([]*grpcquerynode.Server, k)
for i := 0; i < k; i++ {
servers = append(servers, cluster.AddQueryNode())
}
return servers
}
func (cluster *MiniClusterV2) AddQueryNode() *grpcquerynode.Server {
cluster.ptmu.Lock()
defer cluster.ptmu.Unlock()

View File

@ -22,10 +22,8 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -60,63 +58,14 @@ func (s *ReplicaTestSuit) initCollection(collectionName string, replica int, cha
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := integration.ConstructSchema(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: int32(channelNum),
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: channelNum,
SegmentNum: segmentNum,
RowNumPerSegment: segmentRowNum,
})
s.NoError(err)
s.True(merr.Ok(createCollectionStatus))
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.Status))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
for i := 0; i < segmentNum; i++ {
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, segmentRowNum, dim)
hashKeys := integration.GenerateHashKeys(segmentRowNum)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(segmentRowNum),
})
s.NoError(err)
s.True(merr.Ok(insertResult.Status))
// flush
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
}
// create index
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2),
})
s.NoError(err)
s.True(merr.Ok(createIndexStatus))
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
for i := 1; i < replica; i++ {
s.Cluster.AddQueryNode()

View File

@ -0,0 +1,352 @@
package rg
import (
"context"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
const (
DefaultResourceGroup = "__default_resource_group"
RecycleResourceGroup = "__recycle_resource_group"
)
type collectionConfig struct {
resourceGroups []string
createCfg *integration.CreateCollectionConfig
}
type resourceGroupConfig struct {
expectedNodeNum int
rgCfg *rgpb.ResourceGroupConfig
}
type ResourceGroupTestSuite struct {
integration.MiniClusterSuite
rgs map[string]*resourceGroupConfig
collections map[string]*collectionConfig
}
func (s *ResourceGroupTestSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000")
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1")
s.MiniClusterSuite.SetupSuite()
}
func (s *ResourceGroupTestSuite) TestResourceGroup() {
ctx := context.Background()
s.rgs = map[string]*resourceGroupConfig{
DefaultResourceGroup: {
expectedNodeNum: 1,
rgCfg: newRGConfig(1, 1),
},
RecycleResourceGroup: {
expectedNodeNum: 0,
rgCfg: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10000,
},
},
},
"rg1": {
expectedNodeNum: 0,
rgCfg: newRGConfig(0, 0),
},
"rg2": {
expectedNodeNum: 0,
rgCfg: newRGConfig(0, 0),
},
}
s.initResourceGroup(ctx)
s.assertResourceGroup(ctx)
// only one node in rg
s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
s.rgs[DefaultResourceGroup].expectedNodeNum = 2
s.Cluster.AddQueryNode()
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
s.rgs[RecycleResourceGroup].expectedNodeNum = 3
s.Cluster.AddQueryNodes(3)
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
// node in recycle rg should be balanced to rg1 and rg2
s.rgs["rg1"].rgCfg.Requests.NodeNum = 1
s.rgs["rg1"].rgCfg.Limits.NodeNum = 1
s.rgs["rg1"].expectedNodeNum = 1
s.rgs["rg2"].rgCfg.Requests.NodeNum = 2
s.rgs["rg2"].rgCfg.Limits.NodeNum = 2
s.rgs["rg2"].expectedNodeNum = 2
s.rgs[RecycleResourceGroup].expectedNodeNum = 0
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 1
s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
s.rgs[DefaultResourceGroup].expectedNodeNum = 2
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
// redundant node in default rg should be balanced to recycle rg
s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 1
s.rgs[DefaultResourceGroup].expectedNodeNum = 1
s.rgs[RecycleResourceGroup].expectedNodeNum = 1
s.syncResourceConfig(ctx)
s.assertResourceGroup(ctx)
}
func (s *ResourceGroupTestSuite) TestWithReplica() {
ctx := context.Background()
s.rgs = map[string]*resourceGroupConfig{
DefaultResourceGroup: {
expectedNodeNum: 1,
rgCfg: newRGConfig(1, 1),
},
RecycleResourceGroup: {
expectedNodeNum: 0,
rgCfg: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 0,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10000,
},
},
},
"rg1": {
expectedNodeNum: 1,
rgCfg: newRGConfig(1, 1),
},
"rg2": {
expectedNodeNum: 2,
rgCfg: newRGConfig(2, 2),
},
}
s.collections = map[string]*collectionConfig{
"c1": {
resourceGroups: []string{DefaultResourceGroup},
createCfg: newCreateCollectionConfig("c1"),
},
"c2": {
resourceGroups: []string{"rg1"},
createCfg: newCreateCollectionConfig("c2"),
},
"c3": {
resourceGroups: []string{"rg2"},
createCfg: newCreateCollectionConfig("c3"),
},
}
// create resource group
s.initResourceGroup(ctx)
s.Cluster.AddQueryNodes(3)
time.Sleep(100 * time.Millisecond)
s.assertResourceGroup(ctx)
// create and load replicas for testing.
s.createAndLoadCollections(ctx)
s.assertReplica(ctx)
// TODO: current balancer is not working well on move segment between nodes, open following test after fix it.
// // test transfer replica and nodes.
// // transfer one of replica in c3 from rg2 into DEFAULT rg.
// s.collections["c3"].resourceGroups = []string{DefaultResourceGroup, "rg2"}
//
// status, err := s.Cluster.Proxy.TransferReplica(ctx, &milvuspb.TransferReplicaRequest{
// DbName: s.collections["c3"].createCfg.DBName,
// CollectionName: s.collections["c3"].createCfg.CollectionName,
// SourceResourceGroup: "rg2",
// TargetResourceGroup: DefaultResourceGroup,
// NumReplica: 1,
// })
//
// s.NoError(err)
// s.True(merr.Ok(status))
//
// // test transfer node from rg2 into DEFAULT_RESOURCE_GROUP
// s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
// s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
// s.rgs[DefaultResourceGroup].expectedNodeNum = 2
// s.rgs["rg2"].rgCfg.Requests.NodeNum = 1
// s.rgs["rg2"].rgCfg.Limits.NodeNum = 1
// s.rgs["rg2"].expectedNodeNum = 1
// s.syncResourceConfig(ctx)
//
// s.Eventually(func() bool {
// return s.assertReplica(ctx)
// }, 10*time.Minute, 30*time.Second)
}
func (s *ResourceGroupTestSuite) syncResourceConfig(ctx context.Context) {
req := &milvuspb.UpdateResourceGroupsRequest{
ResourceGroups: make(map[string]*rgpb.ResourceGroupConfig),
}
for rgName, cfg := range s.rgs {
req.ResourceGroups[rgName] = cfg.rgCfg
}
status, err := s.Cluster.Proxy.UpdateResourceGroups(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))
// wait for recovery.
time.Sleep(100 * time.Millisecond)
}
func (s *ResourceGroupTestSuite) assertResourceGroup(ctx context.Context) {
resp, err := s.Cluster.Proxy.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.Status))
s.ElementsMatch(resp.ResourceGroups, lo.Keys(s.rgs))
for _, rg := range resp.ResourceGroups {
resp, err := s.Cluster.Proxy.DescribeResourceGroup(ctx, &milvuspb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.Status))
s.Equal(s.rgs[rg].expectedNodeNum, len(resp.ResourceGroup.Nodes))
s.True(proto.Equal(s.rgs[rg].rgCfg, resp.ResourceGroup.Config))
}
}
func (s *ResourceGroupTestSuite) initResourceGroup(ctx context.Context) {
status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: RecycleResourceGroup,
Config: s.rgs[RecycleResourceGroup].rgCfg,
})
s.NoError(err)
s.True(merr.Ok(status))
for rgName, cfg := range s.rgs {
if rgName == RecycleResourceGroup || rgName == DefaultResourceGroup {
continue
}
status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgName,
Config: cfg.rgCfg,
})
s.NoError(err)
s.True(merr.Ok(status))
}
status, err = s.Cluster.Proxy.UpdateResourceGroups(ctx, &milvuspb.UpdateResourceGroupsRequest{
ResourceGroups: map[string]*rgpb.ResourceGroupConfig{
DefaultResourceGroup: s.rgs[DefaultResourceGroup].rgCfg,
},
})
s.NoError(err)
s.True(merr.Ok(status))
}
func (s *ResourceGroupTestSuite) createAndLoadCollections(ctx context.Context) {
wg := &sync.WaitGroup{}
for _, cfg := range s.collections {
cfg := cfg
wg.Add(1)
go func() {
defer wg.Done()
s.CreateCollectionWithConfiguration(ctx, cfg.createCfg)
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: cfg.createCfg.DBName,
CollectionName: cfg.createCfg.CollectionName,
ReplicaNumber: int32(len(cfg.resourceGroups)),
ResourceGroups: cfg.resourceGroups,
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.WaitForLoad(ctx, cfg.createCfg.CollectionName)
}()
}
wg.Wait()
}
func (s *ResourceGroupTestSuite) assertReplica(ctx context.Context) bool {
for _, cfg := range s.collections {
resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
CollectionName: cfg.createCfg.CollectionName,
DbName: cfg.createCfg.DBName,
})
s.NoError(err)
s.True(merr.Ok(resp.Status))
rgs := make(map[string]int)
for _, rg := range cfg.resourceGroups {
rgs[rg]++
}
for _, replica := range resp.GetReplicas() {
s.True(rgs[replica.ResourceGroupName] > 0)
rgs[replica.ResourceGroupName]--
s.NotZero(len(replica.NodeIds))
if len(replica.NumOutboundNode) > 0 {
return false
}
}
for _, v := range rgs {
s.Zero(v)
}
}
return true
}
func newRGConfig(request int, limit int) *rgpb.ResourceGroupConfig {
return &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: int32(request),
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: int32(limit),
},
TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: RecycleResourceGroup,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: RecycleResourceGroup,
},
},
}
}
func newCreateCollectionConfig(collectionName string) *integration.CreateCollectionConfig {
return &integration.CreateCollectionConfig{
DBName: "",
CollectionName: collectionName,
ChannelNum: 2,
SegmentNum: 2,
RowNumPerSegment: 100,
Dim: 128,
}
}
func TestResourceGroup(t *testing.T) {
suite.Run(t, new(ResourceGroupTestSuite))
}

View File

@ -0,0 +1,84 @@
package integration
import (
"context"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
)
type CreateCollectionConfig struct {
DBName string
CollectionName string
ChannelNum int
SegmentNum int
RowNumPerSegment int
Dim int
}
func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) {
schema := ConstructSchema(cfg.CollectionName, cfg.Dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
s.NotNil(marshaledSchema)
createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: cfg.DBName,
CollectionName: cfg.CollectionName,
Schema: marshaledSchema,
ShardsNum: int32(cfg.ChannelNum),
})
s.NoError(err)
s.True(merr.Ok(createCollectionStatus))
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.Status))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
for i := 0; i < cfg.SegmentNum; i++ {
fVecColumn := NewFloatVectorFieldData(FloatVecField, cfg.RowNumPerSegment, cfg.Dim)
hashKeys := GenerateHashKeys(cfg.RowNumPerSegment)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: cfg.DBName,
CollectionName: cfg.CollectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(cfg.RowNumPerSegment),
})
s.NoError(err)
s.True(merr.Ok(insertResult.Status))
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: cfg.DBName,
CollectionNames: []string{cfg.CollectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[cfg.CollectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[cfg.CollectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, cfg.DBName, cfg.CollectionName)
}
// create index
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
DbName: cfg.DBName,
CollectionName: cfg.CollectionName,
FieldName: FloatVecField,
IndexName: "_default",
ExtraParams: ConstructIndexParam(cfg.Dim, IndexFaissIvfFlat, metric.L2),
})
s.NoError(err)
s.True(merr.Ok(createIndexStatus))
s.WaitForIndexBuilt(ctx, cfg.CollectionName, FloatVecField)
}