Refine signal handler for whole milvus role lifetime (#26642)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/26689/head
congqixia 2023-08-29 19:28:28 +08:00 committed by GitHub
parent b2d980ce3c
commit 073ac8350b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 17 deletions

View File

@ -56,7 +56,7 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
signal.Ignore(syscall.SIGPIPE)
var local = false
role := roles.MilvusRoles{}
role := roles.NewMilvusRoles()
switch c.serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true

View File

@ -93,9 +93,8 @@ func runComponent[T component](ctx context.Context,
metricRegister func(*prometheus.Registry),
) T {
var role T
var wg sync.WaitGroup
wg.Add(1)
sign := make(chan struct{})
go func() {
factory := dependency.NewFactory(localMsg)
var err error
@ -108,13 +107,14 @@ func runComponent[T component](ctx context.Context,
if err != nil {
panic(err)
}
wg.Done()
close(sign)
if err := role.Run(); err != nil {
panic(err)
}
runWg.Done()
}()
wg.Wait()
<-sign
healthz.Register(role)
metricRegister(Registry.GoRegistry)
@ -131,6 +131,17 @@ type MilvusRoles struct {
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"`
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
closed chan struct{}
once sync.Once
}
// NewMilvusRoles creates a new MilvusRoles with private fields initialized.
func NewMilvusRoles() *MilvusRoles {
mr := &MilvusRoles{
closed: make(chan struct{}),
}
return mr
}
// EnvValue not used now.
@ -234,14 +245,48 @@ func setupPrometheusHTTPServer(r *internalmetrics.MilvusRegistry) {
})
}
func (mr *MilvusRoles) handleSignals() func() {
sign := make(chan struct{})
done := make(chan struct{})
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
go func() {
defer close(done)
for {
select {
case <-sign:
log.Info("All cleanup done, handleSignals goroutine quit")
return
case sig := <-sc:
log.Warn("Get signal to exit", zap.String("signal", sig.String()))
mr.once.Do(func() {
close(mr.closed)
})
}
}
}()
return func() {
close(sign)
<-done
}
}
// Run Milvus components.
func (mr *MilvusRoles) Run(local bool, alias string) {
// start signal handler, defer close func
closeFn := mr.handleSignals()
defer closeFn()
log.Info("starting running Milvus components")
ctx, cancel := context.WithCancel(context.Background())
defer func() {
// some deferred Stop has race with context cancel
cancel()
}()
defer cancel()
mr.printLDPreLoad()
// only standalone enable localMsg
@ -346,12 +391,5 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
paramtable.SetCreateTime(time.Now())
paramtable.SetUpdateTime(time.Now())
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Error("Get signal to exit\n", zap.String("signal", sig.String()))
<-mr.closed
}