diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 2b03e26879..f8e4b763ad 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -36,8 +37,9 @@ type Client struct { ctx context.Context cancel context.CancelFunc - grpcClient proxypb.ProxyClient - conn *grpc.ClientConn + grpcClient proxypb.ProxyClient + conn *grpc.ClientConn + grpcClientMtx sync.RWMutex addr string } @@ -55,6 +57,13 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { }, nil } +func (c *Client) getGrpcClient() proxypb.ProxyClient { + c.grpcClientMtx.RLock() + defer c.grpcClientMtx.RUnlock() + + return c.grpcClient +} + func (c *Client) Init() error { Params.Init() return c.connect(retry.Attempts(20)) @@ -101,7 +110,11 @@ func (c *Client) connect(retryOptions ...retry.Option) error { return err } log.Debug("ProxyClient connect success") + + c.grpcClientMtx.Lock() + defer c.grpcClientMtx.Unlock() c.grpcClient = proxypb.NewProxyClient(c.conn) + return nil } @@ -137,28 +150,28 @@ func (c *Client) Register() error { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) + return c.getGrpcClient().GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) }) return ret.(*internalpb.ComponentStates), err } func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) + return c.getGrpcClient().GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) }) return ret.(*milvuspb.StringResponse), err } func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.InvalidateCollectionMetaCache(ctx, req) + return c.getGrpcClient().InvalidateCollectionMetaCache(ctx, req) }) return ret.(*commonpb.Status), err } func (c *Client) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) { ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.ReleaseDQLMessageStream(ctx, req) + return c.getGrpcClient().ReleaseDQLMessageStream(ctx, req) }) return ret.(*commonpb.Status), err }