diff --git a/client/entity/resource_group.go b/client/entity/resource_group.go new file mode 100644 index 0000000000..09d5af6c36 --- /dev/null +++ b/client/entity/resource_group.go @@ -0,0 +1,54 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +type ResourceGroup struct { + Name string + Capacity int32 + NumAvailableNode int32 + NumLoadedReplica map[string]int32 + NumOutgoingNode map[string]int32 + NumIncomingNode map[string]int32 + Config *ResourceGroupConfig + Nodes []NodeInfo +} + +type NodeInfo struct { + NodeID int64 + Address string + HostName string +} + +type ResourceGroupLimit struct { + NodeNum int32 +} + +type ResourceGroupTransfer struct { + ResourceGroup string +} + +type ResourceGroupNodeFilter struct { + NodeLabels map[string]string +} + +type ResourceGroupConfig struct { + Requests ResourceGroupLimit + Limits ResourceGroupLimit + TransferFrom []*ResourceGroupTransfer + TransferTo []*ResourceGroupTransfer + NodeFilter ResourceGroupNodeFilter +} diff --git a/client/milvusclient/resource_group.go b/client/milvusclient/resource_group.go index e91654548d..153b3bcbfa 100644 --- a/client/milvusclient/resource_group.go +++ b/client/milvusclient/resource_group.go @@ -19,9 +19,13 @@ package milvusclient import ( "context" + "github.com/samber/lo" "google.golang.org/grpc" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus/client/v2/entity" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -63,3 +67,78 @@ func (c *Client) DropResourceGroup(ctx context.Context, opt DropResourceGroupOpt return err } + +func (c *Client) DescribeResourceGroup(ctx context.Context, opt DescribeResourceGroupOption, callOptions ...grpc.CallOption) (*entity.ResourceGroup, error) { + req := opt.Request() + + var rg *entity.ResourceGroup + err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error { + resp, err := milvusService.DescribeResourceGroup(ctx, req, callOptions...) + if err = merr.CheckRPCCall(resp, err); err != nil { + return err + } + + resultRg := resp.GetResourceGroup() + rg = &entity.ResourceGroup{ + Name: resultRg.GetName(), + Capacity: resultRg.GetCapacity(), + NumAvailableNode: resultRg.GetNumAvailableNode(), + NumLoadedReplica: resultRg.GetNumLoadedReplica(), + NumOutgoingNode: resultRg.GetNumOutgoingNode(), + NumIncomingNode: resultRg.GetNumIncomingNode(), + Config: &entity.ResourceGroupConfig{ + Requests: entity.ResourceGroupLimit{ + NodeNum: resultRg.GetConfig().GetRequests().GetNodeNum(), + }, + Limits: entity.ResourceGroupLimit{ + NodeNum: resultRg.GetConfig().GetLimits().GetNodeNum(), + }, + TransferFrom: lo.Map(resultRg.GetConfig().GetTransferFrom(), func(transfer *rgpb.ResourceGroupTransfer, i int) *entity.ResourceGroupTransfer { + return &entity.ResourceGroupTransfer{ + ResourceGroup: transfer.GetResourceGroup(), + } + }), + TransferTo: lo.Map(resultRg.GetConfig().GetTransferTo(), func(transfer *rgpb.ResourceGroupTransfer, i int) *entity.ResourceGroupTransfer { + return &entity.ResourceGroupTransfer{ + ResourceGroup: transfer.GetResourceGroup(), + } + }), + NodeFilter: entity.ResourceGroupNodeFilter{ + NodeLabels: entity.KvPairsMap(resultRg.GetConfig().GetNodeFilter().GetNodeLabels()), + }, + }, + Nodes: lo.Map(resultRg.GetNodes(), func(node *commonpb.NodeInfo, i int) entity.NodeInfo { + return entity.NodeInfo{ + NodeID: node.GetNodeId(), + Address: node.GetAddress(), + HostName: node.GetHostname(), + } + }), + } + + return nil + }) + return rg, err +} + +func (c *Client) UpdateResourceGroup(ctx context.Context, opt UpdateResourceGroupOption, callOptions ...grpc.CallOption) error { + req := opt.Request() + + err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error { + resp, err := milvusService.UpdateResourceGroups(ctx, req, callOptions...) + return merr.CheckRPCCall(resp, err) + }) + + return err +} + +func (c *Client) TransferReplica(ctx context.Context, opt TransferReplicaOption, callOptions ...grpc.CallOption) error { + req := opt.Request() + + err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error { + resp, err := milvusService.TransferReplica(ctx, req, callOptions...) + return merr.CheckRPCCall(resp, err) + }) + + return err +} diff --git a/client/milvusclient/resource_group_option.go b/client/milvusclient/resource_group_option.go index 5f70f69d0b..6c71405591 100644 --- a/client/milvusclient/resource_group_option.go +++ b/client/milvusclient/resource_group_option.go @@ -17,8 +17,11 @@ package milvusclient import ( + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus/client/v2/entity" ) type ListResourceGroupsOption interface { @@ -90,3 +93,101 @@ func (opt *dropResourceGroupOption) Request() *milvuspb.DropResourceGroupRequest func NewDropResourceGroupOption(name string) *dropResourceGroupOption { return &dropResourceGroupOption{name: name} } + +type DescribeResourceGroupOption interface { + Request() *milvuspb.DescribeResourceGroupRequest +} + +type describeResourceGroupOption struct { + name string +} + +func (opt *describeResourceGroupOption) Request() *milvuspb.DescribeResourceGroupRequest { + return &milvuspb.DescribeResourceGroupRequest{ + ResourceGroup: opt.name, + } +} + +func NewDescribeResourceGroupOption(name string) *describeResourceGroupOption { + return &describeResourceGroupOption{name: name} +} + +type UpdateResourceGroupOption interface { + Request() *milvuspb.UpdateResourceGroupsRequest +} + +type updateResourceGroupOption struct { + name string + rgConfig *entity.ResourceGroupConfig +} + +func (opt *updateResourceGroupOption) Request() *milvuspb.UpdateResourceGroupsRequest { + return &milvuspb.UpdateResourceGroupsRequest{ + ResourceGroups: map[string]*rgpb.ResourceGroupConfig{ + opt.name: { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: opt.rgConfig.Requests.NodeNum, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: opt.rgConfig.Limits.NodeNum, + }, + TransferFrom: lo.Map(opt.rgConfig.TransferFrom, func(transfer *entity.ResourceGroupTransfer, i int) *rgpb.ResourceGroupTransfer { + return &rgpb.ResourceGroupTransfer{ + ResourceGroup: transfer.ResourceGroup, + } + }), + TransferTo: lo.Map(opt.rgConfig.TransferTo, func(transfer *entity.ResourceGroupTransfer, i int) *rgpb.ResourceGroupTransfer { + return &rgpb.ResourceGroupTransfer{ + ResourceGroup: transfer.ResourceGroup, + } + }), + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: entity.MapKvPairs(opt.rgConfig.NodeFilter.NodeLabels), + }, + }, + }, + } +} + +func NewUpdateResourceGroupOption(name string, resourceGroupConfig *entity.ResourceGroupConfig) *updateResourceGroupOption { + return &updateResourceGroupOption{ + name: name, + rgConfig: resourceGroupConfig, + } +} + +type TransferReplicaOption interface { + Request() *milvuspb.TransferReplicaRequest +} + +type transferReplicaOption struct { + collectionName string + sourceRG string + targetRG string + replicaNum int64 + dbName string +} + +func (opt *transferReplicaOption) WithDBName(dbName string) *transferReplicaOption { + opt.dbName = dbName + return opt +} + +func (opt *transferReplicaOption) Request() *milvuspb.TransferReplicaRequest { + return &milvuspb.TransferReplicaRequest{ + CollectionName: opt.collectionName, + SourceResourceGroup: opt.sourceRG, + TargetResourceGroup: opt.targetRG, + NumReplica: opt.replicaNum, + DbName: opt.dbName, + } +} + +func NewTransferReplicaOption(collectionName, sourceGroup, targetGroup string, replicaNum int64) *transferReplicaOption { + return &transferReplicaOption{ + collectionName: collectionName, + sourceRG: sourceGroup, + targetRG: targetGroup, + replicaNum: replicaNum, + } +} diff --git a/client/milvusclient/resource_group_test.go b/client/milvusclient/resource_group_test.go index 2e87647bf0..757ecf305d 100644 --- a/client/milvusclient/resource_group_test.go +++ b/client/milvusclient/resource_group_test.go @@ -19,14 +19,19 @@ package milvusclient import ( "context" "fmt" + "math/rand" "testing" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus/client/v2/entity" + "github.com/milvus-io/milvus/pkg/util/merr" ) type ResourceGroupSuite struct { @@ -103,6 +108,169 @@ func (s *ResourceGroupSuite) TestDropResourceGroup() { }) } +func (s *ResourceGroupSuite) TestDescribeResourceGroup() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("success", func() { + limit := rand.Int31n(10) + 1 + request := rand.Int31n(10) + 1 + rgName := fmt.Sprintf("rg_%s", s.randString(6)) + transferFroms := []string{s.randString(6), s.randString(6)} + transferTos := []string{s.randString(6), s.randString(6)} + labels := map[string]string{ + "label1": s.randString(10), + } + node := entity.NodeInfo{ + NodeID: rand.Int63(), + Address: s.randString(6), + HostName: s.randString(10), + } + s.mock.EXPECT().DescribeResourceGroup(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, drgr *milvuspb.DescribeResourceGroupRequest) (*milvuspb.DescribeResourceGroupResponse, error) { + s.Equal(rgName, drgr.GetResourceGroup()) + return &milvuspb.DescribeResourceGroupResponse{ + ResourceGroup: &milvuspb.ResourceGroup{ + Name: rgName, + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: request, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: limit, + }, + TransferFrom: lo.Map(transferFroms, func(transfer string, i int) *rgpb.ResourceGroupTransfer { + return &rgpb.ResourceGroupTransfer{ + ResourceGroup: transfer, + } + }), + TransferTo: lo.Map(transferTos, func(transfer string, i int) *rgpb.ResourceGroupTransfer { + return &rgpb.ResourceGroupTransfer{ + ResourceGroup: transfer, + } + }), + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: entity.MapKvPairs(labels), + }, + }, + Nodes: []*commonpb.NodeInfo{ + {NodeId: node.NodeID, Address: node.Address, Hostname: node.HostName}, + }, + }, + }, nil + }).Once() + opt := NewDescribeResourceGroupOption(rgName) + rg, err := s.client.DescribeResourceGroup(ctx, opt) + s.NoError(err) + s.Equal(rgName, rg.Name) + s.Equal(limit, rg.Config.Limits.NodeNum) + s.Equal(request, rg.Config.Requests.NodeNum) + s.ElementsMatch(lo.Map(transferFroms, func(transferFrom string, _ int) *entity.ResourceGroupTransfer { + return &entity.ResourceGroupTransfer{ResourceGroup: transferFrom} + }), rg.Config.TransferFrom) + s.ElementsMatch(lo.Map(transferTos, func(transferTo string, _ int) *entity.ResourceGroupTransfer { + return &entity.ResourceGroupTransfer{ResourceGroup: transferTo} + }), rg.Config.TransferTo) + s.Equal(labels, rg.Config.NodeFilter.NodeLabels) + s.ElementsMatch([]entity.NodeInfo{node}, rg.Nodes) + }) + + s.Run("failure", func() { + rgName := fmt.Sprintf("rg_%s", s.randString(6)) + s.mock.EXPECT().DescribeResourceGroup(mock.Anything, mock.Anything).Return(nil, errors.New("mocked")).Once() + opt := NewDescribeResourceGroupOption(rgName) + _, err := s.client.DescribeResourceGroup(ctx, opt) + s.Error(err) + }) +} + +func (s *ResourceGroupSuite) TestUpdateResourceGroup() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("success", func() { + limit := rand.Int31n(10) + 1 + request := rand.Int31n(10) + 1 + rgName := fmt.Sprintf("rg_%s", s.randString(6)) + transferFroms := []string{s.randString(6), s.randString(6)} + transferTos := []string{s.randString(6), s.randString(6)} + labels := map[string]string{ + "label1": s.randString(10), + } + s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, urgr *milvuspb.UpdateResourceGroupsRequest) (*commonpb.Status, error) { + config, ok := urgr.GetResourceGroups()[rgName] + s.Require().True(ok) + s.Equal(request, config.GetRequests().GetNodeNum()) + s.Equal(limit, config.GetLimits().GetNodeNum()) + s.ElementsMatch(transferFroms, lo.Map(config.GetTransferFrom(), func(transfer *rgpb.ResourceGroupTransfer, i int) string { + return transfer.GetResourceGroup() + })) + s.ElementsMatch(transferTos, lo.Map(config.GetTransferTo(), func(transfer *rgpb.ResourceGroupTransfer, i int) string { + return transfer.GetResourceGroup() + })) + s.Equal(labels, entity.KvPairsMap(config.GetNodeFilter().GetNodeLabels())) + return merr.Success(), nil + }).Once() + opt := NewUpdateResourceGroupOption(rgName, &entity.ResourceGroupConfig{ + Requests: entity.ResourceGroupLimit{NodeNum: request}, + Limits: entity.ResourceGroupLimit{NodeNum: limit}, + TransferFrom: []*entity.ResourceGroupTransfer{ + {ResourceGroup: transferFroms[0]}, + {ResourceGroup: transferFroms[1]}, + }, + TransferTo: []*entity.ResourceGroupTransfer{ + {ResourceGroup: transferTos[0]}, + {ResourceGroup: transferTos[1]}, + }, + NodeFilter: entity.ResourceGroupNodeFilter{ + NodeLabels: labels, + }, + }) + err := s.client.UpdateResourceGroup(ctx, opt) + s.NoError(err) + }) + + s.Run("failure", func() { + rgName := fmt.Sprintf("rg_%s", s.randString(6)) + s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.Anything).Return(nil, errors.New("mocked")).Once() + opt := NewUpdateResourceGroupOption(rgName, &entity.ResourceGroupConfig{}) + err := s.client.UpdateResourceGroup(ctx, opt) + s.Error(err) + }) +} + +func (s *ResourceGroupSuite) TestTransferReplica() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("success", func() { + collName := fmt.Sprintf("rg_%s", s.randString(6)) + dbName := fmt.Sprintf("db_%s", s.randString(6)) + from := fmt.Sprintf("rg_%s", s.randString(6)) + to := fmt.Sprintf("rg_%s", s.randString(6)) + replicaNum := rand.Int63n(10) + 1 + s.mock.EXPECT().TransferReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, tr *milvuspb.TransferReplicaRequest) (*commonpb.Status, error) { + s.Equal(collName, tr.GetCollectionName()) + s.Equal(dbName, tr.GetDbName()) + s.Equal(from, tr.GetSourceResourceGroup()) + s.Equal(to, tr.GetTargetResourceGroup()) + return merr.Success(), nil + }).Once() + opt := NewTransferReplicaOption(collName, from, to, replicaNum).WithDBName(dbName) + err := s.client.TransferReplica(ctx, opt) + s.NoError(err) + }) + + s.Run("failure", func() { + rgName := fmt.Sprintf("rg_%s", s.randString(6)) + from := fmt.Sprintf("rg_%s", s.randString(6)) + to := fmt.Sprintf("rg_%s", s.randString(6)) + s.mock.EXPECT().TransferReplica(mock.Anything, mock.Anything).Return(nil, errors.New("mocked")).Once() + opt := NewTransferReplicaOption(rgName, from, to, 1) + err := s.client.TransferReplica(ctx, opt) + s.Error(err) + }) +} + func TestResourceGroup(t *testing.T) { suite.Run(t, new(ResourceGroupSuite)) }