mirror of https://github.com/milvus-io/milvus.git
Fix DataCoord panics if message queue service quits before it (#15702)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/15709/head
parent
54b8b24151
commit
b0923f1299
|
@ -455,47 +455,58 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
||||||
zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
|
zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
|
||||||
ttMsgStream.Start()
|
ttMsgStream.Start()
|
||||||
|
|
||||||
go func() {
|
go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
|
||||||
var checker *timerecord.LongTermChecker
|
}
|
||||||
if enableTtChecker {
|
|
||||||
checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
|
|
||||||
checker.Start()
|
|
||||||
defer checker.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
defer logutil.LogPanic()
|
func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) {
|
||||||
defer s.serverLoopWg.Done()
|
var checker *timerecord.LongTermChecker
|
||||||
defer ttMsgStream.Close()
|
if enableTtChecker {
|
||||||
for {
|
checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
|
||||||
select {
|
checker.Start()
|
||||||
case <-ctx.Done():
|
defer checker.Stop()
|
||||||
log.Debug("DataNode timetick loop shutdown")
|
}
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
msgPack := ttMsgStream.Consume()
|
|
||||||
if msgPack == nil {
|
|
||||||
log.Debug("receive nil timetick msg and shutdown timetick channel")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, msg := range msgPack.Msgs {
|
|
||||||
ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
|
|
||||||
if !ok {
|
|
||||||
log.Warn("receive unexpected msg type from tt channel")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if enableTtChecker {
|
|
||||||
checker.Check()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
|
defer logutil.LogPanic()
|
||||||
log.Error("failed to handle timetick message", zap.Error(err))
|
defer s.serverLoopWg.Done()
|
||||||
continue
|
defer func() {
|
||||||
}
|
// https://github.com/milvus-io/milvus/issues/15659
|
||||||
|
// msgstream service closed before datacoord quits
|
||||||
|
defer func() {
|
||||||
|
if x := recover(); x != nil {
|
||||||
|
log.Error("Failed to close ttMessage", zap.Any("recovered", x))
|
||||||
}
|
}
|
||||||
s.helper.eventAfterHandleDataNodeTt()
|
}()
|
||||||
}
|
ttMsgStream.Close()
|
||||||
}()
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Debug("DataNode timetick loop shutdown")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
msgPack := ttMsgStream.Consume()
|
||||||
|
if msgPack == nil {
|
||||||
|
log.Debug("receive nil timetick msg and shutdown timetick channel")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, msg := range msgPack.Msgs {
|
||||||
|
ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
|
||||||
|
if !ok {
|
||||||
|
log.Warn("receive unexpected msg type from tt channel")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if enableTtChecker {
|
||||||
|
checker.Check()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
|
||||||
|
log.Error("failed to handle timetick message", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.helper.eventAfterHandleDataNodeTt()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
|
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
|
||||||
|
|
|
@ -43,6 +43,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -2247,6 +2248,42 @@ func TestGetFlushState(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://github.com/milvus-io/milvus/issues/15659
|
||||||
|
func TestIssue15659(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s := &Server{
|
||||||
|
helper: ServerHelper{
|
||||||
|
eventAfterHandleDataNodeTt: func() {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ms := &MockClosePanicMsgstream{}
|
||||||
|
ms.On("Consume").Return(&msgstream.MsgPack{})
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
s.serverLoopWg.Add(1)
|
||||||
|
s.handleDataNodeTimetickMsgstream(ctx, ms)
|
||||||
|
close(ch)
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
cancel()
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
type MockClosePanicMsgstream struct {
|
||||||
|
mock.Mock
|
||||||
|
msgstream.MsgStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MockClosePanicMsgstream) Close() {
|
||||||
|
panic("mocked close panic")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MockClosePanicMsgstream) Consume() *msgstream.MsgPack {
|
||||||
|
args := ms.Called()
|
||||||
|
return args.Get(0).(*msgstream.MsgPack)
|
||||||
|
}
|
||||||
|
|
||||||
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
|
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
||||||
|
|
Loading…
Reference in New Issue