Fix ut data race (#16939)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/16907/head
Jiquan Long 2022-05-13 17:43:53 +08:00 committed by GitHub
parent a4038d9f0c
commit ca3ecec5b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 37 additions and 35 deletions

View File

@ -86,8 +86,6 @@ type Server struct {
// avoid race
httpServerMtx sync.Mutex
grpcErrChan chan error
etcdCli *clientv3.Client
rootCoordClient types.RootCoord
dataCoordClient types.DataCoord
@ -103,8 +101,7 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
var err error
server := &Server{
ctx: ctx,
grpcErrChan: make(chan error),
ctx: ctx,
}
server.proxy, err = proxy.NewProxy(server.ctx, factory)
@ -145,13 +142,17 @@ func (s *Server) startHTTPServer(port int) {
}
}
func (s *Server) startRPCServer(grpcPort, grpcInternalPort int) {
s.wg.Add(2)
go s.startInternalGrpc(grpcInternalPort)
go s.startExternalGrpc(grpcPort)
func (s *Server) startInternalRPCServer(grpcInternalPort int, errChan chan error) {
s.wg.Add(1)
go s.startInternalGrpc(grpcInternalPort, errChan)
}
func (s *Server) startExternalGrpc(grpcPort int) {
func (s *Server) startExternalRPCServer(grpcExternalPort int, errChan chan error) {
s.wg.Add(1)
go s.startExternalGrpc(grpcExternalPort, errChan)
}
func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
defer s.wg.Done()
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -167,13 +168,11 @@ func (s *Server) startExternalGrpc(grpcPort int) {
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Warn("Proxy server failed to listen on", zap.Error(err), zap.Int("port", grpcPort))
s.grpcErrChan <- err
errChan <- err
return
}
log.Debug("Proxy server already listen on tcp", zap.Int("port", grpcPort))
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
opts := trace.GetInterceptorOpts()
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
@ -219,21 +218,19 @@ func (s *Server) startExternalGrpc(grpcPort int) {
proxypb.RegisterProxyServer(s.grpcExternalServer, s)
milvuspb.RegisterMilvusServiceServer(s.grpcExternalServer, s)
grpc_health_v1.RegisterHealthServer(s.grpcExternalServer, s)
errChan <- nil
log.Debug("create Proxy grpc server",
zap.Any("enforcement policy", kaep),
zap.Any("server parameters", kasp))
log.Debug("waiting for Proxy grpc server to be ready")
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
log.Debug("Proxy grpc server has been ready, serve grpc requests on listen")
if err := s.grpcExternalServer.Serve(lis); err != nil {
log.Warn("failed to serve on Proxy's listener", zap.Error(err))
s.grpcErrChan <- err
log.Error("failed to serve on Proxy's listener", zap.Error(err))
panic(err)
}
}
func (s *Server) startInternalGrpc(grpcPort int) {
func (s *Server) startInternalGrpc(grpcPort int, errChan chan error) {
defer s.wg.Done()
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -249,14 +246,11 @@ func (s *Server) startInternalGrpc(grpcPort int) {
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Warn("Proxy internal server failed to listen on", zap.Error(err), zap.Int("port", grpcPort))
s.grpcErrChan <- err
errChan <- err
return
}
log.Debug("Proxy internal server already listen on tcp", zap.Int("port", grpcPort))
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
opts := trace.GetInterceptorOpts()
s.grpcInternalServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
@ -275,17 +269,15 @@ func (s *Server) startInternalGrpc(grpcPort int) {
proxypb.RegisterProxyServer(s.grpcInternalServer, s)
milvuspb.RegisterMilvusServiceServer(s.grpcInternalServer, s)
grpc_health_v1.RegisterHealthServer(s.grpcInternalServer, s)
errChan <- nil
log.Debug("create Proxy internal grpc server",
zap.Any("enforcement policy", kaep),
zap.Any("server parameters", kasp))
log.Debug("waiting for Proxy internal grpc server to be ready")
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
log.Debug("Proxy internal grpc server has been ready, serve grpc requests on listen")
if err := s.grpcInternalServer.Serve(lis); err != nil {
log.Warn("failed to internal serve on Proxy's listener", zap.Error(err))
s.grpcErrChan <- err
log.Error("failed to internal serve on Proxy's listener", zap.Error(err))
panic(err)
}
}
@ -335,13 +327,23 @@ func (s *Server) init() error {
}
s.etcdCli = etcdCli
s.proxy.SetEtcdClient(s.etcdCli)
s.startRPCServer(Params.Port, Params.InternalPort)
log.Debug("waiting for grpc server of Proxy to be started")
if err := <-s.grpcErrChan; err != nil {
log.Warn("failed to start Proxy's grpc server", zap.Error(err))
return err
errChan := make(chan error, 1)
{
s.startInternalRPCServer(Params.InternalPort, errChan)
if err := <-errChan; err != nil {
log.Error("failed to create internal rpc server", zap.Error(err))
return err
}
}
log.Debug("grpc server of proxy has been started")
{
s.startExternalRPCServer(Params.Port, errChan)
if err := <-errChan; err != nil {
log.Error("failed to create external rpc server", zap.Error(err))
return err
}
}
if HTTPParams.Enabled {
log.Info("start http server of proxy", zap.Int("port", HTTPParams.Port))
s.wg.Add(1)