mirror of https://github.com/milvus-io/milvus.git
Fix panic when stop rootcoord (#20019)
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/20041/head
parent
2f19e6595a
commit
7a13b1c329
|
@ -302,6 +302,7 @@ func newTestCore(opts ...Opt) *Core {
|
|||
// no schedule, execute directly.
|
||||
s.Execute(context.Background())
|
||||
}
|
||||
executor.StopFunc = func() {}
|
||||
c.stepExecutor = executor
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
|
@ -737,6 +738,7 @@ type mockScheduler struct {
|
|||
IScheduler
|
||||
AddTaskFunc func(t task) error
|
||||
GetMinDdlTsFunc func() Timestamp
|
||||
StopFunc func()
|
||||
minDdlTs Timestamp
|
||||
}
|
||||
|
||||
|
@ -758,6 +760,12 @@ func (m mockScheduler) GetMinDdlTs() Timestamp {
|
|||
return m.minDdlTs
|
||||
}
|
||||
|
||||
func (m mockScheduler) Stop() {
|
||||
if m.StopFunc != nil {
|
||||
m.StopFunc()
|
||||
}
|
||||
}
|
||||
|
||||
func withScheduler(sched IScheduler) Opt {
|
||||
return func(c *Core) {
|
||||
c.scheduler = sched
|
||||
|
@ -770,6 +778,7 @@ func withValidScheduler() Opt {
|
|||
t.NotifyDone(nil)
|
||||
return nil
|
||||
}
|
||||
sched.StopFunc = func() {}
|
||||
return withScheduler(sched)
|
||||
}
|
||||
|
||||
|
|
|
@ -169,6 +169,7 @@ func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
|
|||
// UpdateStateCode update state code
|
||||
func (c *Core) UpdateStateCode(code commonpb.StateCode) {
|
||||
c.stateCode.Store(code)
|
||||
log.Info("update rootcoord state", zap.String("state", code.String()))
|
||||
}
|
||||
|
||||
func (c *Core) checkHealthy() (commonpb.StateCode, bool) {
|
||||
|
@ -657,17 +658,43 @@ func (c *Core) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *Core) stopExecutor() {
|
||||
if c.stepExecutor != nil {
|
||||
c.stepExecutor.Stop()
|
||||
log.Info("stop rootcoord executor")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Core) stopScheduler() {
|
||||
if c.scheduler != nil {
|
||||
c.scheduler.Stop()
|
||||
log.Info("stop rootcoord scheduler")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Core) cancelIfNotNil() {
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
log.Info("cancel rootcoord goroutines")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Core) revokeSession() {
|
||||
if c.session != nil {
|
||||
// wait at most one second to revoke
|
||||
c.session.Revoke(time.Second)
|
||||
log.Info("revoke rootcoord session")
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops rootCoord.
|
||||
func (c *Core) Stop() error {
|
||||
c.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
||||
c.stepExecutor.Stop()
|
||||
c.scheduler.Stop()
|
||||
|
||||
c.cancel()
|
||||
c.stopExecutor()
|
||||
c.stopScheduler()
|
||||
c.cancelIfNotNil()
|
||||
c.wg.Wait()
|
||||
// wait at most one second to revoke
|
||||
c.session.Revoke(time.Second)
|
||||
c.revokeSession()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1449,3 +1449,25 @@ func TestRootCoord_CheckHealth(t *testing.T) {
|
|||
assert.NotEmpty(t, resp.Reasons)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCore_Stop(t *testing.T) {
|
||||
t.Run("abnormal stop before component is ready", func(t *testing.T) {
|
||||
c := &Core{}
|
||||
err := c.Stop()
|
||||
assert.NoError(t, err)
|
||||
code, ok := c.stateCode.Load().(commonpb.StateCode)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, commonpb.StateCode_Abnormal, code)
|
||||
})
|
||||
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
c := newTestCore(withHealthyCode(),
|
||||
withValidScheduler())
|
||||
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||
err := c.Stop()
|
||||
assert.NoError(t, err)
|
||||
code, ok := c.stateCode.Load().(commonpb.StateCode)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, commonpb.StateCode_Abnormal, code)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue