Fix data race of proxy grpc client (#8091)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/8114/head
dragondriver 2021-09-16 19:31:59 +08:00 committed by GitHub
parent 085fe95da1
commit ca2d09a167
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 6 deletions

View File

@ -15,6 +15,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -36,8 +37,9 @@ type Client struct {
ctx context.Context
cancel context.CancelFunc
grpcClient proxypb.ProxyClient
conn *grpc.ClientConn
grpcClient proxypb.ProxyClient
conn *grpc.ClientConn
grpcClientMtx sync.RWMutex
addr string
}
@ -55,6 +57,13 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
}, nil
}
func (c *Client) getGrpcClient() proxypb.ProxyClient {
c.grpcClientMtx.RLock()
defer c.grpcClientMtx.RUnlock()
return c.grpcClient
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
@ -101,7 +110,11 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
return err
}
log.Debug("ProxyClient connect success")
c.grpcClientMtx.Lock()
defer c.grpcClientMtx.Unlock()
c.grpcClient = proxypb.NewProxyClient(c.conn)
return nil
}
@ -137,28 +150,28 @@ func (c *Client) Register() error {
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
return c.getGrpcClient().GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
})
return ret.(*internalpb.ComponentStates), err
}
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
return c.getGrpcClient().GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
})
return ret.(*milvuspb.StringResponse), err
}
func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.InvalidateCollectionMetaCache(ctx, req)
return c.getGrpcClient().InvalidateCollectionMetaCache(ctx, req)
})
return ret.(*commonpb.Status), err
}
func (c *Client) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ReleaseDQLMessageStream(ctx, req)
return c.getGrpcClient().ReleaseDQLMessageStream(ctx, req)
})
return ret.(*commonpb.Status), err
}