Handle server error (#9032)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/9175/head
godchen 2021-10-03 22:17:56 +08:00 committed by GitHub
parent 1a05a939c1
commit 2792f85211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 7 deletions

View File

@ -307,7 +307,10 @@ func (s *Server) startServerLoop() {
go s.startWatchService(s.serverLoopCtx)
go s.startFlushLoop(s.serverLoopCtx)
go s.session.LivenessCheck(s.serverLoopCtx, s.liveCh, func() {
s.Stop()
err := s.Stop()
if err != nil {
log.Error("server stop fail", zap.Error(err))
}
})
}
@ -394,7 +397,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
s.segmentManager.ExpireAllocations(ch, ts)
err = s.segmentManager.ExpireAllocations(ch, ts)
if err != nil {
log.Warn("expire allocations failed", zap.Error(err))
continue
}
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))

View File

@ -648,7 +648,8 @@ func TestChannel(t *testing.T) {
segInfo := &datapb.SegmentInfo{
ID: segID,
}
svr.meta.AddSegment(NewSegmentInfo(segInfo))
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
@ -680,7 +681,7 @@ func TestChannel(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345))
err := statsStream.Produce(&msgPack)
err = statsStream.Produce(&msgPack)
assert.Nil(t, err)
})
}
@ -870,7 +871,8 @@ func TestDataNodeTtChannel(t *testing.T) {
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime)
msgPack.Msgs = append(msgPack.Msgs, msg)
ttMsgStream.Produce(&msgPack)
err = ttMsgStream.Produce(&msgPack)
assert.Nil(t, err)
flushMsg := <-ch
flushReq := flushMsg.(*datapb.FlushSegmentsRequest)
@ -955,7 +957,8 @@ func TestDataNodeTtChannel(t *testing.T) {
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime)
msgPack.Msgs = append(msgPack.Msgs, msg)
ttMsgStream.Produce(&msgPack)
err = ttMsgStream.Produce(&msgPack)
assert.Nil(t, err)
flushMsg := <-ch
flushReq := flushMsg.(*datapb.FlushSegmentsRequest)
assert.EqualValues(t, 1, len(flushReq.SegmentIDs))
@ -1019,7 +1022,8 @@ func TestDataNodeTtChannel(t *testing.T) {
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", resp.SegIDAssignments[0].ExpireTime)
msgPack.Msgs = append(msgPack.Msgs, msg)
ttMsgStream.Produce(&msgPack)
err = ttMsgStream.Produce(&msgPack)
assert.Nil(t, err)
<-ch
segment = svr.meta.GetSegment(assignedSegmentID)