From b0923f1299fea669380456e8175f971576d244af Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 23 Feb 2022 11:09:52 +0800 Subject: [PATCH] Fix DataCoord panics if message queue service quits before it (#15702) Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 85 +++++++++++++++++-------------- internal/datacoord/server_test.go | 37 ++++++++++++++ 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 1c997ba791..04f1a2db63 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -455,47 +455,58 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName)) ttMsgStream.Start() - go func() { - var checker *timerecord.LongTermChecker - if enableTtChecker { - checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg) - checker.Start() - defer checker.Stop() - } + go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream) +} - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - defer ttMsgStream.Close() - for { - select { - case <-ctx.Done(): - log.Debug("DataNode timetick loop shutdown") - return - default: - } - msgPack := ttMsgStream.Consume() - if msgPack == nil { - log.Debug("receive nil timetick msg and shutdown timetick channel") - return - } - for _, msg := range msgPack.Msgs { - ttMsg, ok := msg.(*msgstream.DataNodeTtMsg) - if !ok { - log.Warn("receive unexpected msg type from tt channel") - continue - } - if enableTtChecker { - checker.Check() - } +func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) { + var checker *timerecord.LongTermChecker + if enableTtChecker { + checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg) + checker.Start() + defer checker.Stop() + } - if err := s.handleTimetickMessage(ctx, ttMsg); err != nil { - log.Error("failed to handle timetick message", zap.Error(err)) - continue - } + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + defer func() { + // https://github.com/milvus-io/milvus/issues/15659 + // msgstream service closed before datacoord quits + defer func() { + if x := recover(); x != nil { + log.Error("Failed to close ttMessage", zap.Any("recovered", x)) } - s.helper.eventAfterHandleDataNodeTt() - } + }() + ttMsgStream.Close() }() + for { + select { + case <-ctx.Done(): + log.Debug("DataNode timetick loop shutdown") + return + default: + } + msgPack := ttMsgStream.Consume() + if msgPack == nil { + log.Debug("receive nil timetick msg and shutdown timetick channel") + return + } + for _, msg := range msgPack.Msgs { + ttMsg, ok := msg.(*msgstream.DataNodeTtMsg) + if !ok { + log.Warn("receive unexpected msg type from tt channel") + continue + } + if enableTtChecker { + checker.Check() + } + + if err := s.handleTimetickMessage(ctx, ttMsg); err != nil { + log.Error("failed to handle timetick message", zap.Error(err)) + continue + } + } + s.helper.eventAfterHandleDataNodeTt() + } } func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 1e942389cc..7866030ce5 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -2247,6 +2248,42 @@ func TestGetFlushState(t *testing.T) { }) } +// https://github.com/milvus-io/milvus/issues/15659 +func TestIssue15659(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := &Server{ + helper: ServerHelper{ + eventAfterHandleDataNodeTt: func() {}, + }, + } + ms := &MockClosePanicMsgstream{} + ms.On("Consume").Return(&msgstream.MsgPack{}) + ch := make(chan struct{}) + go func() { + assert.NotPanics(t, func() { + s.serverLoopWg.Add(1) + s.handleDataNodeTimetickMsgstream(ctx, ms) + close(ch) + }) + }() + cancel() + <-ch +} + +type MockClosePanicMsgstream struct { + mock.Mock + msgstream.MsgStream +} + +func (ms *MockClosePanicMsgstream) Close() { + panic("mocked close panic") +} + +func (ms *MockClosePanicMsgstream) Consume() *msgstream.MsgPack { + args := ms.Called() + return args.Get(0).(*msgstream.MsgPack) +} + func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { Params.Init() Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())