From 99279e0bef386862a4a5f5f40ac69a41902e852e Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 5 Dec 2024 17:27:53 +0800 Subject: [PATCH] enhance: remove the rpc layer of coordinator when enabling standalone or mixcoord (#38246) issue: #33285 pr: #37815 - remove the rpc layer of coordinator when enabling standalone or mixcoord - move health check into init --------- Signed-off-by: chyezh --- cmd/milvus/util.go | 8 +-- cmd/roles/roles.go | 7 ++ configs/milvus.yaml | 1 + go.mod | 2 - go.sum | 3 - internal/coordinator/coordclient/registry.go | 18 +++-- .../coordinator/coordclient/registry_test.go | 4 ++ internal/distributed/querycoord/service.go | 14 ---- internal/querycoordv2/server.go | 17 +++++ internal/util/grpcclient/local_grpc_client.go | 68 +++++++++++++++++++ .../util/grpcclient/local_grpc_client_test.go | 46 +++++++++++++ pkg/util/paramtable/component_param.go | 31 +++++++++ pkg/util/paramtable/component_param_test.go | 4 ++ 13 files changed, 187 insertions(+), 36 deletions(-) create mode 100644 internal/util/grpcclient/local_grpc_client.go create mode 100644 internal/util/grpcclient/local_grpc_client_test.go diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index baa94db368..35068a6d32 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/cmd/roles" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -172,12 +171,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) } - coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ - ServerType: serverType, - EnableQueryCoord: role.EnableQueryCoord, - EnableDataCoord: role.EnableDataCoord, - EnableRootCoord: role.EnableRootCoord, - }) + return role } diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 82de6f24e3..43a52a1a42 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/cmd/components" + "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/http/healthz" "github.com/milvus-io/milvus/internal/util/dependency" @@ -375,6 +376,12 @@ func (mr *MilvusRoles) Run() { paramtable.Init() paramtable.SetRole(mr.ServerType) } + coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ + ServerType: mr.ServerType, + EnableQueryCoord: mr.EnableQueryCoord, + EnableDataCoord: mr.EnableDataCoord, + EnableRootCoord: mr.EnableRootCoord, + }) enableComponents := []bool{ mr.EnableRootCoord, diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8d59312138..eae92ce628 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -822,6 +822,7 @@ common: usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction + localRPCEnabled: true # enable local rpc for internal communication when mix or standalone mode. # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/go.mod b/go.mod index 0425af6ff4..12891387bb 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,6 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7 require ( github.com/bits-and-blooms/bitset v1.10.0 github.com/bytedance/sonic v1.9.1 - github.com/fullstorydev/grpchan v1.1.1 github.com/greatroar/blobloom v0.8.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 @@ -140,7 +139,6 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect - github.com/jhump/protoreflect v1.12.0 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect diff --git a/go.sum b/go.sum index 814fe0c955..4fc1b2a749 100644 --- a/go.sum +++ b/go.sum @@ -265,8 +265,6 @@ github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas= -github.com/fullstorydev/grpchan v1.1.1/go.mod h1:f4HpiV8V6htfY/K44GWV1ESQzHBTq7DinhzqQ95lpgc= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= @@ -499,7 +497,6 @@ github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSl github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= -github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01HR10= github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM= diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index 98737424c2..8ba4a721cd 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -6,7 +6,6 @@ import ( "go.uber.org/zap" - "github.com/fullstorydev/grpchan/inprocgrpc" dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client" rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" @@ -14,7 +13,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -49,6 +50,9 @@ type LocalClientRoleConfig struct { // EnableLocalClientRole init localable roles func EnableLocalClientRole(cfg *LocalClientRoleConfig) { + if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { + return + } if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole { return } @@ -60,9 +64,7 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) { if !enableLocal.EnableQueryCoord { return } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&querypb.QueryCoord_ServiceDesc, server) - newLocalClient := querypb.NewQueryCoordClient(channel) + newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient) glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient}) log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal)) } @@ -72,9 +74,7 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) { if !enableLocal.EnableDataCoord { return } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&datapb.DataCoord_ServiceDesc, server) - newLocalClient := datapb.NewDataCoordClient(channel) + newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient) glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient}) log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal)) } @@ -84,9 +84,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { if !enableLocal.EnableRootCoord { return } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&rootcoordpb.RootCoord_ServiceDesc, server) - newLocalClient := rootcoordpb.NewRootCoordClient(channel) + newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient) glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient}) log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal)) } diff --git a/internal/coordinator/coordclient/registry_test.go b/internal/coordinator/coordclient/registry_test.go index 8ed97ac3d5..6752b637cc 100644 --- a/internal/coordinator/coordclient/registry_test.go +++ b/internal/coordinator/coordclient/registry_test.go @@ -9,10 +9,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestRegistry(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().CommonCfg.LocalRPCEnabled.Key, "true") + assert.False(t, enableLocal.EnableQueryCoord) assert.False(t, enableLocal.EnableDataCoord) assert.False(t, enableLocal.EnableRootCoord) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 77d4cc2596..a2e2e6d7e2 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -37,7 +37,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" qc "github.com/milvus-io/milvus/internal/querycoordv2" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" @@ -172,17 +171,9 @@ func (s *Server) init() error { } // wait for master init or healthy - log.Info("QueryCoord try to wait for RootCoord ready") - err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err)) - panic(err) - } - if err := s.SetRootCoord(s.rootCoord); err != nil { panic(err) } - log.Info("QueryCoord report RootCoord ready") // --- Data service client --- if s.dataCoord == nil { @@ -190,11 +181,6 @@ func (s *Server) init() error { } log.Info("QueryCoord try to wait for DataCoord ready") - err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err)) - panic(err) - } if err := s.SetDataCoord(s.dataCoord); err != nil { panic(err) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 0b6aa3d36d..ee93bb0f2f 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -53,6 +53,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -218,6 +219,22 @@ func (s *Server) Init() error { } func (s *Server) initQueryCoord() error { + // wait for master init or healthy + log.Info("QueryCoord try to wait for RootCoord ready") + if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200); err != nil { + log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err)) + panic(err) + } + log.Info("QueryCoord report RootCoord ready") + + // wait for master init or healthy + log.Info("QueryCoord try to wait for DataCoord ready") + if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200); err != nil { + log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err)) + panic(err) + } + log.Info("QueryCoord report DataCoord ready") + s.UpdateStateCode(commonpb.StateCode_Initializing) log.Info("start init querycoord", zap.Any("State", commonpb.StateCode_Initializing)) // Init KV and ID allocator diff --git a/internal/util/grpcclient/local_grpc_client.go b/internal/util/grpcclient/local_grpc_client.go new file mode 100644 index 0000000000..21afecf061 --- /dev/null +++ b/internal/util/grpcclient/local_grpc_client.go @@ -0,0 +1,68 @@ +package grpcclient + +import ( + "context" + "fmt" + "reflect" + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var _ grpc.ClientConnInterface = &localConn{} + +// NewLocalGRPCClient creates a grpc client that calls the server directly. +// !!! Warning: it didn't make any network or serialization/deserialization, so it's not promise concurrent safe. +// and there's no interceptor for client and server like the common grpc client/server. +func NewLocalGRPCClient[C any, S any](desc *grpc.ServiceDesc, server S, clientCreator func(grpc.ClientConnInterface) C) C { + return clientCreator(&localConn{ + serviceDesc: desc, + server: server, + }) +} + +// localConn is a grpc.ClientConnInterface implementation that calls the server directly. +type localConn struct { + serviceDesc *grpc.ServiceDesc // ServiceDesc is the descriptor for this service. + server interface{} // the server object. +} + +// Invoke calls the server method directly. +func (c *localConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { + methodDesc := c.findMethod(method) + if methodDesc == nil { + return status.Errorf(codes.Unimplemented, fmt.Sprintf("method %s not implemented", method)) + } + resp, err := methodDesc.Handler(c.server, ctx, func(in any) error { + reflect.ValueOf(in).Elem().Set(reflect.ValueOf(args).Elem()) + return nil + }, nil) + if err != nil { + return err + } + reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(resp).Elem()) + return nil +} + +// NewStream is not supported by now, wait for implementation. +func (c *localConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + panic("we don't support local stream rpc by now") +} + +// findMethod finds the method descriptor by the full method name. +func (c *localConn) findMethod(fullMethodName string) *grpc.MethodDesc { + strs := strings.SplitN(fullMethodName[1:], "/", 2) + serviceName := strs[0] + methodName := strs[1] + if c.serviceDesc.ServiceName != serviceName { + return nil + } + for i := range c.serviceDesc.Methods { + if c.serviceDesc.Methods[i].MethodName == methodName { + return &c.serviceDesc.Methods[i] + } + } + return nil +} diff --git a/internal/util/grpcclient/local_grpc_client_test.go b/internal/util/grpcclient/local_grpc_client_test.go new file mode 100644 index 0000000000..bcd59e62a9 --- /dev/null +++ b/internal/util/grpcclient/local_grpc_client_test.go @@ -0,0 +1,46 @@ +package grpcclient + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" +) + +type mockRootCoordServer struct { + t *testing.T + *rootcoordpb.UnimplementedRootCoordServer +} + +func (s *mockRootCoordServer) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { + assert.NotNil(s.t, req) + assert.Equal(s.t, uint32(100), req.Count) + return &rootcoordpb.AllocIDResponse{ + ID: 1, + Count: 2, + }, nil +} + +func TestLocalGRPCClient(t *testing.T) { + localClient := NewLocalGRPCClient( + &rootcoordpb.RootCoord_ServiceDesc, + &mockRootCoordServer{ + t: t, + UnimplementedRootCoordServer: &rootcoordpb.UnimplementedRootCoordServer{}, + }, + rootcoordpb.NewRootCoordClient, + ) + result, err := localClient.AllocTimestamp(context.Background(), &rootcoordpb.AllocTimestampRequest{}) + assert.Error(t, err) + assert.Nil(t, result) + + result2, err := localClient.AllocID(context.Background(), &rootcoordpb.AllocIDRequest{ + Count: 100, + }) + assert.NoError(t, err) + assert.NotNil(t, result2) + assert.Equal(t, int64(1), result2.ID) + assert.Equal(t, uint32(2), result2.Count) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index cf02043647..cef8375f48 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -264,6 +264,12 @@ type commonConfig struct { ReadOnlyPrivileges ParamItem `refreshable:"false"` ReadWritePrivileges ParamItem `refreshable:"false"` AdminPrivileges ParamItem `refreshable:"false"` + + HealthCheckInterval ParamItem `refreshable:"true"` + HealthCheckRPCTimeout ParamItem `refreshable:"true"` + + // Local RPC enabled for milvus internal communication when mix or standalone mode. + LocalRPCEnabled ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -913,6 +919,31 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`, } p.AdminPrivileges.Init(base.mgr) + + p.HealthCheckInterval = ParamItem{ + Key: "common.healthcheck.interval.seconds", + Version: "2.4.8", + DefaultValue: "30", + Doc: `health check interval in seconds, default 30s`, + } + p.HealthCheckInterval.Init(base.mgr) + + p.HealthCheckRPCTimeout = ParamItem{ + Key: "common.healthcheck.timeout.seconds", + Version: "2.4.8", + DefaultValue: "10", + Doc: `RPC timeout for health check request`, + } + p.HealthCheckRPCTimeout.Init(base.mgr) + + p.LocalRPCEnabled = ParamItem{ + Key: "common.localRPCEnabled", + Version: "2.4.18", + DefaultValue: "true", + Doc: `enable local rpc for internal communication when mix or standalone mode.`, + Export: true, + } + p.LocalRPCEnabled.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index d979746d4c..921217c4cb 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -129,6 +129,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) + + assert.False(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) + params.Save("common.localRPCEnabled", "true") + assert.True(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) }) t.Run("test rootCoordConfig", func(t *testing.T) {