fix: ctx cancel should be the last step while stopping server (#31220)

issue: #31219

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/31240/head
jaime 2024-03-15 10:33:05 +08:00 committed by GitHub
parent ca8eee2c47
commit db79be3ae0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 88 additions and 61 deletions

View File

@ -1126,9 +1126,6 @@ func (s *Server) Stop() error {
s.cluster.Close()
logutil.Logger(s.ctx).Info("datacoord cluster stopped")
s.stopServerLoop()
logutil.Logger(s.ctx).Info("datacoord serverloop stopped")
if s.session != nil {
s.session.Stop()
}
@ -1136,6 +1133,9 @@ func (s *Server) Stop() error {
if s.icSession != nil {
s.icSession.Stop()
}
s.stopServerLoop()
logutil.Logger(s.ctx).Info("datacoord serverloop stopped")
logutil.Logger(s.ctx).Warn("datacoord stop successful")
return nil

View File

@ -426,8 +426,6 @@ func (node *DataNode) Stop() error {
node.stopOnce.Do(func() {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
node.cancel()
node.eventManager.CloseAll()
@ -460,6 +458,7 @@ func (node *DataNode) Stop() error {
node.importManager.Close()
}
node.cancel()
node.stopWaiter.Wait()
})
return nil

View File

@ -61,7 +61,7 @@ type Server struct {
serverID atomic.Int64
wg sync.WaitGroup
grpcWG sync.WaitGroup
dataCoord types.DataCoordComponent
etcdCli *clientv3.Client
@ -135,7 +135,7 @@ func (s *Server) init() error {
func (s *Server) startGrpc() error {
Params := &paramtable.Get().DataCoordGrpcServerCfg
s.wg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(Params.Port.GetAsInt())
// wait for grpc server loop start
err := <-s.grpcErrChan
@ -144,7 +144,7 @@ func (s *Server) startGrpc() error {
func (s *Server) startGrpcLoop(grpcPort int) {
defer logutil.LogPanic()
defer s.wg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().DataCoordGrpcServerCfg
log.Debug("network port", zap.Int("port", grpcPort))
@ -229,8 +229,6 @@ func (s *Server) Stop() (err error) {
logger.Info("Datacoord stopped", zap.Error(err))
}()
s.cancel()
if s.etcdCli != nil {
defer s.etcdCli.Close()
}
@ -240,14 +238,16 @@ func (s *Server) Stop() (err error) {
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
s.grpcWG.Wait()
logger.Info("internal server[dataCoord] start to stop")
err = s.dataCoord.Stop()
if err != nil {
log.Error("failed to close dataCoord", zap.Error(err))
return err
}
s.wg.Wait()
s.cancel()
return nil
}

View File

@ -57,7 +57,7 @@ import (
type Server struct {
datanode types.DataNodeComponent
wg sync.WaitGroup
grpcWG sync.WaitGroup
grpcErrChan chan error
grpcServer *grpc.Server
ctx context.Context
@ -97,7 +97,7 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
func (s *Server) startGrpc() error {
Params := &paramtable.Get().DataNodeGrpcServerCfg
s.wg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(Params.Port.GetAsInt())
// wait for grpc server loop start
err := <-s.grpcErrChan
@ -106,7 +106,7 @@ func (s *Server) startGrpc() error {
// startGrpcLoop starts the grep loop of datanode component.
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().DataNodeGrpcServerCfg
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -202,24 +202,26 @@ func (s *Server) Run() error {
func (s *Server) Stop() (err error) {
Params := &paramtable.Get().DataNodeGrpcServerCfg
logger := log.With(zap.String("address", Params.GetAddress()))
logger.Info("Datanode stopping")
logger.Info("datanode stopping")
defer func() {
logger.Info("Datanode stopped", zap.Error(err))
logger.Info("datanode stopped", zap.Error(err))
}()
s.cancel()
if s.etcdCli != nil {
defer s.etcdCli.Close()
}
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
s.grpcWG.Wait()
logger.Info("internal server[datanode] start to stop")
err = s.datanode.Stop()
if err != nil {
log.Error("failed to close datanode", zap.Error(err))
return err
}
s.wg.Wait()
s.cancel()
return nil
}

View File

@ -405,11 +405,4 @@ func Test_Run(t *testing.T) {
err = server.Run()
assert.Error(t, err)
server.datanode = &MockDataNode{
stopErr: errors.New("error"),
}
err = server.Stop()
assert.Error(t, err)
}

View File

@ -61,7 +61,7 @@ type Server struct {
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
grpcWG sync.WaitGroup
etcdCli *clientv3.Client
}
@ -81,7 +81,7 @@ func (s *Server) Run() error {
// startGrpcLoop starts the grep loop of IndexNode component.
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().IndexNodeGrpcServerCfg
log.Debug("IndexNode", zap.String("network address", Params.GetAddress()), zap.Int("network port: ", grpcPort))
@ -159,7 +159,7 @@ func (s *Server) init() error {
}
}()
s.loopWg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(Params.Port.GetAsInt())
// wait for grpc server loop start
err = <-s.grpcErrChan
@ -220,17 +220,21 @@ func (s *Server) Stop() (err error) {
}()
if s.indexnode != nil {
s.indexnode.Stop()
err := s.indexnode.Stop()
if err != nil {
log.Error("failed to close indexnode", zap.Error(err))
return err
}
}
s.loopCancel()
if s.etcdCli != nil {
defer s.etcdCli.Close()
}
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
s.loopWg.Wait()
s.grpcWG.Wait()
s.loopCancel()
return nil
}

View File

@ -745,8 +745,10 @@ func (s *Server) Stop() (err error) {
s.wg.Wait()
logger.Info("internal server[proxy] start to stop")
err = s.proxy.Stop()
if err != nil {
log.Error("failed to close proxy", zap.Error(err))
return err
}

View File

@ -57,7 +57,7 @@ import (
// Server is the grpc server of QueryCoord.
type Server struct {
wg sync.WaitGroup
grpcWG sync.WaitGroup
loopCtx context.Context
loopCancel context.CancelFunc
grpcServer *grpc.Server
@ -147,7 +147,7 @@ func (s *Server) init() error {
log.Info("Connected to tikv. Using tikv as metadata storage.")
}
s.wg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(rpcParams.Port.GetAsInt())
// wait for grpc server loop start
err = <-s.grpcErrChan
@ -204,7 +204,7 @@ func (s *Server) init() error {
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().QueryCoordGrpcServerCfg
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -283,11 +283,18 @@ func (s *Server) Stop() (err error) {
if s.etcdCli != nil {
defer s.etcdCli.Close()
}
s.loopCancel()
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
return s.queryCoord.Stop()
s.grpcWG.Wait()
logger.Info("internal server[queryCoord] start to stop")
if err := s.queryCoord.Stop(); err != nil {
log.Error("failed to close queryCoord", zap.Error(err))
}
s.loopCancel()
return nil
}
// SetRootCoord sets root coordinator's client

View File

@ -58,7 +58,7 @@ type UniqueID = typeutil.UniqueID
// Server is the grpc server of QueryNode.
type Server struct {
querynode types.QueryNodeComponent
wg sync.WaitGroup
grpcWG sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
grpcErrChan chan error
@ -118,7 +118,7 @@ func (s *Server) init() error {
s.SetEtcdClient(etcdCli)
s.querynode.SetAddress(Params.GetAddress())
log.Debug("QueryNode connect to etcd successfully")
s.wg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(Params.Port.GetAsInt())
// wait for grpc server loop start
err = <-s.grpcErrChan
@ -152,7 +152,7 @@ func (s *Server) start() error {
// startGrpcLoop starts the grpc loop of QueryNode component.
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().QueryNodeGrpcServerCfg
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -245,19 +245,22 @@ func (s *Server) Stop() (err error) {
logger.Info("QueryNode stopped", zap.Error(err))
}()
logger.Info("internal server[querynode] start to stop")
err = s.querynode.Stop()
if err != nil {
log.Error("failed to close querynode", zap.Error(err))
return err
}
if s.etcdCli != nil {
defer s.etcdCli.Close()
}
s.cancel()
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
s.wg.Wait()
s.grpcWG.Wait()
s.cancel()
return nil
}

View File

@ -61,7 +61,7 @@ type Server struct {
grpcServer *grpc.Server
grpcErrChan chan error
wg sync.WaitGroup
grpcWG sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@ -234,7 +234,7 @@ func (s *Server) init() error {
}
func (s *Server) startGrpc(port int) error {
s.wg.Add(1)
s.grpcWG.Add(1)
go s.startGrpcLoop(port)
// wait for grpc server loop start
err := <-s.grpcErrChan
@ -242,7 +242,7 @@ func (s *Server) startGrpc(port int) error {
}
func (s *Server) startGrpcLoop(port int) {
defer s.wg.Done()
defer s.grpcWG.Done()
Params := &paramtable.Get().RootCoordGrpcServerCfg
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
@ -330,11 +330,10 @@ func (s *Server) Stop() (err error) {
defer s.tikvCli.Close()
}
s.cancel()
if s.grpcServer != nil {
utils.GracefulStopGRPCServer(s.grpcServer)
}
s.wg.Wait()
s.grpcWG.Wait()
if s.dataCoord != nil {
if err := s.dataCoord.Close(); err != nil {
@ -347,10 +346,13 @@ func (s *Server) Stop() (err error) {
}
}
if s.rootCoord != nil {
logger.Info("internal server[rootCoord] start to stop")
if err := s.rootCoord.Stop(); err != nil {
log.Error("Failed to close close rootCoord", zap.Error(err))
log.Error("Failed to close rootCoord", zap.Error(err))
}
}
s.cancel()
return nil
}

View File

@ -257,7 +257,6 @@ func (i *IndexNode) Stop() error {
task.cancel()
}
}
i.loopCancel()
if i.sched != nil {
i.sched.Close()
}
@ -266,6 +265,7 @@ func (i *IndexNode) Stop() error {
}
i.CloseSegcore()
i.loopCancel()
log.Info("Index node stopped.")
})
return nil

View File

@ -451,11 +451,6 @@ func (s *Server) startServerLoop() {
}
func (s *Server) Stop() error {
// stop the components from outside to inside,
// to make the dependencies stopped working properly,
// cancel the server context first to stop receiving requests
s.cancel()
// FOLLOW the dependence graph:
// job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session
// observers -> dist controller
@ -503,6 +498,7 @@ func (s *Server) Stop() error {
s.session.Stop()
}
s.cancel()
s.wg.Wait()
log.Info("QueryCoord stop successfully")
return nil

View File

@ -476,8 +476,7 @@ func (node *QueryNode) Stop() error {
if node.pipelineManager != nil {
node.pipelineManager.Close()
}
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the pipeline
node.cancel()
if node.session != nil {
node.session.Stop()
}
@ -489,6 +488,9 @@ func (node *QueryNode) Stop() error {
}
node.CloseSegcore()
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the pipeline
node.cancel()
})
return nil
}

View File

@ -772,7 +772,7 @@ func (c *Core) revokeSession() {
if c.session != nil {
// wait at most one second to revoke
c.session.Stop()
log.Info("revoke rootcoord session")
log.Info("rootcoord session stop")
}
}
@ -784,12 +784,13 @@ func (c *Core) Stop() error {
if c.proxyWatcher != nil {
c.proxyWatcher.Stop()
}
c.cancelIfNotNil()
if c.quotaCenter != nil {
c.quotaCenter.stop()
}
c.wg.Wait()
c.revokeSession()
c.cancelIfNotNil()
c.wg.Wait()
return nil
}

View File

@ -151,6 +151,8 @@ type Session struct {
sessionTTL int64
sessionRetryTimes int64
reuseNodeID bool
isStopped atomic.Bool // set to true if stop method is invoked
}
type SessionOption func(session *Session)
@ -239,6 +241,7 @@ func NewSessionWithEtcd(ctx context.Context, metaRoot string, client *clientv3.C
sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(),
sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(),
reuseNodeID: true,
isStopped: *atomic.NewBool(false),
}
// integration test create cluster with different nodeId in one process
@ -861,7 +864,8 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
if callback != nil {
// before exit liveness check, callback to exit the session owner
defer func() {
if ctx.Err() == nil {
// the callback method will not be invoked if session is stopped.
if ctx.Err() == nil && !s.isStopped.Load() {
go callback()
}
}()
@ -940,6 +944,7 @@ func (s *Session) deleteSession() bool {
}
func (s *Session) Stop() {
s.isStopped.Store(true)
s.Revoke(time.Second)
s.cancelKeepAlive()
s.deleteSession()
@ -951,17 +956,28 @@ func (s *Session) Revoke(timeout time.Duration) {
if s == nil {
return
}
log.Info("start to revoke session", zap.String("sessionKey", s.activeKey))
if s.etcdCli == nil || s.LeaseID == nil {
log.Warn("skip remove session",
zap.String("sessionKey", s.activeKey),
zap.Bool("etcdCliIsNil", s.etcdCli == nil),
zap.Bool("LeaseIDIsNil", s.LeaseID == nil),
)
return
}
if s.Disconnected() {
log.Warn("skip remove session, connection is disconnected", zap.String("sessionKey", s.activeKey))
return
}
// can NOT use s.ctx, it may be Done here
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// ignores resp & error, just do best effort to revoke
_, _ = s.etcdCli.Revoke(ctx, *s.LeaseID)
_, err := s.etcdCli.Revoke(ctx, *s.LeaseID)
if err != nil {
log.Warn("failed to revoke session", zap.String("sessionKey", s.activeKey), zap.Error(err))
}
log.Info("revoke session successfully", zap.String("sessionKey", s.activeKey))
}
// UpdateRegistered update the state of registered.