mirror of https://github.com/milvus-io/milvus.git
enhance: [10kcp] Revert "enhance: remove the rpc level of coordinator (#37914)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/37981/head
parent
bf90e55319
commit
4845e4d679
|
@ -20,7 +20,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/cmd/roles"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
|
@ -172,12 +171,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
|
|||
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
|
||||
os.Exit(-1)
|
||||
}
|
||||
coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
|
||||
ServerType: serverType,
|
||||
EnableQueryCoord: role.EnableQueryCoord,
|
||||
EnableDataCoord: role.EnableDataCoord,
|
||||
EnableRootCoord: role.EnableRootCoord,
|
||||
})
|
||||
|
||||
return role
|
||||
}
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -66,7 +66,6 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7
|
|||
require (
|
||||
github.com/bits-and-blooms/bitset v1.10.0
|
||||
github.com/bytedance/sonic v1.9.1
|
||||
github.com/fullstorydev/grpchan v1.1.1
|
||||
github.com/greatroar/blobloom v0.8.0
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2
|
||||
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
|
||||
|
@ -140,7 +139,6 @@ require (
|
|||
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
|
||||
github.com/jhump/protoreflect v1.12.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.2.2 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/asmfmt v1.3.2 // indirect
|
||||
|
|
3
go.sum
3
go.sum
|
@ -265,8 +265,6 @@ github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
|
|||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas=
|
||||
github.com/fullstorydev/grpchan v1.1.1/go.mod h1:f4HpiV8V6htfY/K44GWV1ESQzHBTq7DinhzqQ95lpgc=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
|
||||
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
|
||||
|
@ -499,7 +497,6 @@ github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSl
|
|||
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
|
||||
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
|
||||
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
|
||||
github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01HR10=
|
||||
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
|
||||
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM=
|
||||
|
|
|
@ -1,164 +0,0 @@
|
|||
package coordclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
||||
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
|
||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// localClient is a client that can access local server directly
|
||||
type localClient struct {
|
||||
queryCoordClient *syncutil.Future[types.QueryCoordClient]
|
||||
dataCoordClient *syncutil.Future[types.DataCoordClient]
|
||||
rootCoordClient *syncutil.Future[types.RootCoordClient]
|
||||
}
|
||||
|
||||
var (
|
||||
enableLocal *LocalClientRoleConfig // a global map to store all can be local accessible roles.
|
||||
glocalClient *localClient // !!! WARNING: local client will ignore all interceptor of grpc client and server.
|
||||
)
|
||||
|
||||
func init() {
|
||||
enableLocal = &LocalClientRoleConfig{}
|
||||
glocalClient = &localClient{
|
||||
queryCoordClient: syncutil.NewFuture[types.QueryCoordClient](),
|
||||
dataCoordClient: syncutil.NewFuture[types.DataCoordClient](),
|
||||
rootCoordClient: syncutil.NewFuture[types.RootCoordClient](),
|
||||
}
|
||||
}
|
||||
|
||||
type LocalClientRoleConfig struct {
|
||||
ServerType string
|
||||
EnableQueryCoord bool
|
||||
EnableDataCoord bool
|
||||
EnableRootCoord bool
|
||||
}
|
||||
|
||||
// EnableLocalClientRole init localable roles
|
||||
func EnableLocalClientRole(cfg *LocalClientRoleConfig) {
|
||||
if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole {
|
||||
return
|
||||
}
|
||||
enableLocal = cfg
|
||||
}
|
||||
|
||||
// RegisterQueryCoordServer register query coord server
|
||||
func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
|
||||
if !enableLocal.EnableQueryCoord {
|
||||
return
|
||||
}
|
||||
channel := &inprocgrpc.Channel{}
|
||||
channel.RegisterService(&querypb.QueryCoord_ServiceDesc, server)
|
||||
newLocalClient := querypb.NewQueryCoordClient(channel)
|
||||
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
|
||||
log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
|
||||
}
|
||||
|
||||
// RegsterDataCoordServer register data coord server
|
||||
func RegisterDataCoordServer(server datapb.DataCoordServer) {
|
||||
if !enableLocal.EnableDataCoord {
|
||||
return
|
||||
}
|
||||
channel := &inprocgrpc.Channel{}
|
||||
channel.RegisterService(&datapb.DataCoord_ServiceDesc, server)
|
||||
newLocalClient := datapb.NewDataCoordClient(channel)
|
||||
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
|
||||
log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
|
||||
}
|
||||
|
||||
// RegisterRootCoordServer register root coord server
|
||||
func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
|
||||
if !enableLocal.EnableRootCoord {
|
||||
return
|
||||
}
|
||||
channel := &inprocgrpc.Channel{}
|
||||
channel.RegisterService(&rootcoordpb.RootCoord_ServiceDesc, server)
|
||||
newLocalClient := rootcoordpb.NewRootCoordClient(channel)
|
||||
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
|
||||
log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
|
||||
}
|
||||
|
||||
// GetQueryCoordClient return query coord client
|
||||
func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
|
||||
var client types.QueryCoordClient
|
||||
var err error
|
||||
if enableLocal.EnableQueryCoord {
|
||||
client, err = glocalClient.queryCoordClient.GetWithContext(ctx)
|
||||
} else {
|
||||
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
|
||||
client, err = qcc.NewClient(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("get query coord client failed: %v", err))
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// GetDataCoordClient return data coord client
|
||||
func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
|
||||
var client types.DataCoordClient
|
||||
var err error
|
||||
if enableLocal.EnableDataCoord {
|
||||
client, err = glocalClient.dataCoordClient.GetWithContext(ctx)
|
||||
} else {
|
||||
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
|
||||
client, err = dcc.NewClient(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("get data coord client failed: %v", err))
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// GetRootCoordClient return root coord client
|
||||
func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
|
||||
var client types.RootCoordClient
|
||||
var err error
|
||||
if enableLocal.EnableRootCoord {
|
||||
client, err = glocalClient.rootCoordClient.GetWithContext(ctx)
|
||||
} else {
|
||||
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
|
||||
client, err = rcc.NewClient(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("get root coord client failed: %v", err))
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
type nopCloseQueryCoordClient struct {
|
||||
querypb.QueryCoordClient
|
||||
}
|
||||
|
||||
func (n *nopCloseQueryCoordClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nopCloseDataCoordClient struct {
|
||||
datapb.DataCoordClient
|
||||
}
|
||||
|
||||
func (n *nopCloseDataCoordClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nopCloseRootCoordClient struct {
|
||||
rootcoordpb.RootCoordClient
|
||||
}
|
||||
|
||||
func (n *nopCloseRootCoordClient) Close() error {
|
||||
return nil
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
package coordclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func TestRegistry(t *testing.T) {
|
||||
assert.False(t, enableLocal.EnableQueryCoord)
|
||||
assert.False(t, enableLocal.EnableDataCoord)
|
||||
assert.False(t, enableLocal.EnableRootCoord)
|
||||
|
||||
EnableLocalClientRole(&LocalClientRoleConfig{
|
||||
ServerType: typeutil.RootCoordRole,
|
||||
EnableQueryCoord: true,
|
||||
EnableDataCoord: true,
|
||||
EnableRootCoord: true,
|
||||
})
|
||||
assert.False(t, enableLocal.EnableQueryCoord)
|
||||
assert.False(t, enableLocal.EnableDataCoord)
|
||||
assert.False(t, enableLocal.EnableRootCoord)
|
||||
|
||||
RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
|
||||
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
|
||||
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
|
||||
assert.False(t, glocalClient.dataCoordClient.Ready())
|
||||
assert.False(t, glocalClient.queryCoordClient.Ready())
|
||||
assert.False(t, glocalClient.rootCoordClient.Ready())
|
||||
|
||||
enableLocal = &LocalClientRoleConfig{}
|
||||
|
||||
EnableLocalClientRole(&LocalClientRoleConfig{
|
||||
ServerType: typeutil.StandaloneRole,
|
||||
EnableQueryCoord: true,
|
||||
EnableDataCoord: true,
|
||||
EnableRootCoord: true,
|
||||
})
|
||||
assert.True(t, enableLocal.EnableDataCoord)
|
||||
assert.True(t, enableLocal.EnableQueryCoord)
|
||||
assert.True(t, enableLocal.EnableRootCoord)
|
||||
|
||||
RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
|
||||
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
|
||||
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
|
||||
assert.True(t, glocalClient.dataCoordClient.Ready())
|
||||
assert.True(t, glocalClient.queryCoordClient.Ready())
|
||||
assert.True(t, glocalClient.rootCoordClient.Ready())
|
||||
|
||||
enableLocal = &LocalClientRoleConfig{}
|
||||
|
||||
EnableLocalClientRole(&LocalClientRoleConfig{
|
||||
ServerType: typeutil.MixtureRole,
|
||||
EnableQueryCoord: true,
|
||||
EnableDataCoord: true,
|
||||
EnableRootCoord: true,
|
||||
})
|
||||
assert.True(t, enableLocal.EnableDataCoord)
|
||||
assert.True(t, enableLocal.EnableQueryCoord)
|
||||
assert.True(t, enableLocal.EnableRootCoord)
|
||||
|
||||
assert.NotNil(t, GetQueryCoordClient(context.Background()))
|
||||
assert.NotNil(t, GetDataCoordClient(context.Background()))
|
||||
assert.NotNil(t, GetRootCoordClient(context.Background()))
|
||||
GetQueryCoordClient(context.Background()).Close()
|
||||
GetDataCoordClient(context.Background()).Close()
|
||||
GetRootCoordClient(context.Background()).Close()
|
||||
}
|
|
@ -35,11 +35,11 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
globalIDAllocator "github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/dataview"
|
||||
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
|
||||
indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
|
||||
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/kv/tikv"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
|
@ -254,7 +254,7 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64)
|
|||
}
|
||||
|
||||
func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) {
|
||||
return coordclient.GetRootCoordClient(ctx), nil
|
||||
return rootcoordclient.NewClient(ctx)
|
||||
}
|
||||
|
||||
// QuitSignal returns signal when server quits
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
|
||||
"github.com/milvus-io/milvus/internal/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/distributed/utils"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -201,7 +200,6 @@ func (s *Server) startGrpcLoop() {
|
|||
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
|
||||
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
|
||||
datapb.RegisterDataCoordServer(s.grpcServer, s)
|
||||
coordclient.RegisterDataCoordServer(s)
|
||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
s.grpcErrChan <- err
|
||||
|
|
|
@ -31,7 +31,8 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
|
||||
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/distributed/utils"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
|
@ -168,7 +169,11 @@ func (s *Server) init() error {
|
|||
|
||||
// --- Master Server Client ---
|
||||
if s.rootCoord == nil {
|
||||
s.rootCoord = coordclient.GetRootCoordClient(s.loopCtx)
|
||||
s.rootCoord, err = rcc.NewClient(s.loopCtx)
|
||||
if err != nil {
|
||||
log.Error("QueryCoord try to new RootCoord client failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for master init or healthy
|
||||
|
@ -186,7 +191,11 @@ func (s *Server) init() error {
|
|||
|
||||
// --- Data service client ---
|
||||
if s.dataCoord == nil {
|
||||
s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx)
|
||||
s.dataCoord, err = dcc.NewClient(s.loopCtx)
|
||||
if err != nil {
|
||||
log.Error("QueryCoord try to new DataCoord client failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("QueryCoord try to wait for DataCoord ready")
|
||||
|
@ -249,7 +258,6 @@ func (s *Server) startGrpcLoop() {
|
|||
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
|
||||
)
|
||||
querypb.RegisterQueryCoordServer(s.grpcServer, s)
|
||||
coordclient.RegisterQueryCoordServer(s)
|
||||
|
||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
|
|
|
@ -31,7 +31,8 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
|
||||
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
||||
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
|
||||
"github.com/milvus-io/milvus/internal/distributed/utils"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
|
@ -71,8 +72,8 @@ type Server struct {
|
|||
dataCoord types.DataCoordClient
|
||||
queryCoord types.QueryCoordClient
|
||||
|
||||
newDataCoordClient func(ctx context.Context) types.DataCoordClient
|
||||
newQueryCoordClient func(ctx context.Context) types.QueryCoordClient
|
||||
newDataCoordClient func() types.DataCoordClient
|
||||
newQueryCoordClient func() types.QueryCoordClient
|
||||
}
|
||||
|
||||
func (s *Server) DescribeDatabase(ctx context.Context, request *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) {
|
||||
|
@ -156,8 +157,21 @@ func (s *Server) Prepare() error {
|
|||
}
|
||||
|
||||
func (s *Server) setClient() {
|
||||
s.newDataCoordClient = coordclient.GetDataCoordClient
|
||||
s.newQueryCoordClient = coordclient.GetQueryCoordClient
|
||||
s.newDataCoordClient = func() types.DataCoordClient {
|
||||
dsClient, err := dcc.NewClient(s.ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return dsClient
|
||||
}
|
||||
|
||||
s.newQueryCoordClient = func() types.QueryCoordClient {
|
||||
qsClient, err := qcc.NewClient(s.ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return qsClient
|
||||
}
|
||||
}
|
||||
|
||||
// Run initializes and starts RootCoord's grpc service.
|
||||
|
@ -220,7 +234,7 @@ func (s *Server) init() error {
|
|||
|
||||
if s.newDataCoordClient != nil {
|
||||
log.Info("RootCoord start to create DataCoord client")
|
||||
dataCoord := s.newDataCoordClient(s.ctx)
|
||||
dataCoord := s.newDataCoordClient()
|
||||
s.dataCoord = dataCoord
|
||||
if err := s.rootCoord.SetDataCoordClient(dataCoord); err != nil {
|
||||
panic(err)
|
||||
|
@ -229,7 +243,7 @@ func (s *Server) init() error {
|
|||
|
||||
if s.newQueryCoordClient != nil {
|
||||
log.Info("RootCoord start to create QueryCoord client")
|
||||
queryCoord := s.newQueryCoordClient(s.ctx)
|
||||
queryCoord := s.newQueryCoordClient()
|
||||
s.queryCoord = queryCoord
|
||||
if err := s.rootCoord.SetQueryCoordClient(queryCoord); err != nil {
|
||||
panic(err)
|
||||
|
@ -291,7 +305,6 @@ func (s *Server) startGrpcLoop() {
|
|||
)),
|
||||
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
|
||||
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)
|
||||
coordclient.RegisterRootCoordServer(s)
|
||||
|
||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
|
|
|
@ -142,13 +142,13 @@ func TestRun(t *testing.T) {
|
|||
|
||||
mockDataCoord := mocks.NewMockDataCoordClient(t)
|
||||
mockDataCoord.EXPECT().Close().Return(nil)
|
||||
svr.newDataCoordClient = func(_ context.Context) types.DataCoordClient {
|
||||
svr.newDataCoordClient = func() types.DataCoordClient {
|
||||
return mockDataCoord
|
||||
}
|
||||
|
||||
mockQueryCoord := mocks.NewMockQueryCoordClient(t)
|
||||
mockQueryCoord.EXPECT().Close().Return(nil)
|
||||
svr.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient {
|
||||
svr.newQueryCoordClient = func() types.QueryCoordClient {
|
||||
return mockQueryCoord
|
||||
}
|
||||
|
||||
|
@ -238,7 +238,7 @@ func TestServerRun_DataCoordClientInitErr(t *testing.T) {
|
|||
|
||||
mockDataCoord := mocks.NewMockDataCoordClient(t)
|
||||
mockDataCoord.EXPECT().Close().Return(nil)
|
||||
server.newDataCoordClient = func(_ context.Context) types.DataCoordClient {
|
||||
server.newDataCoordClient = func() types.DataCoordClient {
|
||||
return mockDataCoord
|
||||
}
|
||||
err = server.Prepare()
|
||||
|
@ -268,7 +268,7 @@ func TestServerRun_DataCoordClientStartErr(t *testing.T) {
|
|||
|
||||
mockDataCoord := mocks.NewMockDataCoordClient(t)
|
||||
mockDataCoord.EXPECT().Close().Return(nil)
|
||||
server.newDataCoordClient = func(_ context.Context) types.DataCoordClient {
|
||||
server.newDataCoordClient = func() types.DataCoordClient {
|
||||
return mockDataCoord
|
||||
}
|
||||
err = server.Prepare()
|
||||
|
@ -298,7 +298,7 @@ func TestServerRun_QueryCoordClientInitErr(t *testing.T) {
|
|||
|
||||
mockQueryCoord := mocks.NewMockQueryCoordClient(t)
|
||||
mockQueryCoord.EXPECT().Close().Return(nil)
|
||||
server.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient {
|
||||
server.newQueryCoordClient = func() types.QueryCoordClient {
|
||||
return mockQueryCoord
|
||||
}
|
||||
err = server.Prepare()
|
||||
|
@ -328,7 +328,7 @@ func TestServer_QueryCoordClientStartErr(t *testing.T) {
|
|||
|
||||
mockQueryCoord := mocks.NewMockQueryCoordClient(t)
|
||||
mockQueryCoord.EXPECT().Close().Return(nil)
|
||||
server.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient {
|
||||
server.newQueryCoordClient = func() types.QueryCoordClient {
|
||||
return mockQueryCoord
|
||||
}
|
||||
err = server.Prepare()
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
package syncutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Future is a future value that can be set and retrieved.
|
||||
type Future[T any] struct {
|
||||
ch chan struct{}
|
||||
value T
|
||||
}
|
||||
|
||||
// NewFuture creates a new future.
|
||||
func NewFuture[T any]() *Future[T] {
|
||||
return &Future[T]{
|
||||
ch: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Set sets the value of the future.
|
||||
func (f *Future[T]) Set(value T) {
|
||||
f.value = value
|
||||
close(f.ch)
|
||||
}
|
||||
|
||||
// GetWithContext retrieves the value of the future if set, otherwise block until set or the context is done.
|
||||
func (f *Future[T]) GetWithContext(ctx context.Context) (T, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
var val T
|
||||
return val, ctx.Err()
|
||||
case <-f.ch:
|
||||
return f.value, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves the value of the future if set, otherwise block until set.
|
||||
func (f *Future[T]) Get() T {
|
||||
<-f.ch
|
||||
return f.value
|
||||
}
|
||||
|
||||
// Done returns a channel that is closed when the future is set.
|
||||
func (f *Future[T]) Done() <-chan struct{} {
|
||||
return f.ch
|
||||
}
|
||||
|
||||
// Ready returns true if the future is set.
|
||||
func (f *Future[T]) Ready() bool {
|
||||
select {
|
||||
case <-f.ch:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
|
@ -1,51 +0,0 @@
|
|||
package syncutil
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFuture_SetAndGet(t *testing.T) {
|
||||
f := NewFuture[int]()
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second) // Simulate some work
|
||||
f.Set(42)
|
||||
}()
|
||||
|
||||
val := f.Get()
|
||||
if val != 42 {
|
||||
t.Errorf("Expected value 42, got %d", val)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFuture_Done(t *testing.T) {
|
||||
f := NewFuture[string]()
|
||||
go func() {
|
||||
f.Set("done")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-f.Done():
|
||||
// Success
|
||||
case <-time.After(20 * time.Millisecond):
|
||||
t.Error("Expected future to be done within 2 seconds")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFuture_Ready(t *testing.T) {
|
||||
f := NewFuture[float64]()
|
||||
go func() {
|
||||
time.Sleep(20 * time.Millisecond) // Simulate some work
|
||||
f.Set(3.14)
|
||||
}()
|
||||
|
||||
if f.Ready() {
|
||||
t.Error("Expected future not to be ready immediately")
|
||||
}
|
||||
|
||||
<-f.Done() // Wait for the future to be set
|
||||
|
||||
if !f.Ready() {
|
||||
t.Error("Expected future to be ready after being set")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue