enhance: Try LatestMessageID when checkpoint unmarshal fails (#33158)

See also #33122

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/33193/head
congqixia 2024-05-21 16:59:39 +08:00 committed by GitHub
parent 7ab7e3a004
commit 12e8c6c583
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 17 additions and 6 deletions

View File

@ -482,10 +482,15 @@ func (ms *mqMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPositi
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
continue
// try to use latest message ID first
messageID, err = consumer.GetLatestMsgID()
if err != nil {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
continue
}
} else {
return err
}
return err
}
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
@ -853,11 +858,17 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
return false, nil
// try to use latest message ID first
seekMsgID, err = consumer.GetLatestMsgID()
if err != nil {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
return false, nil
}
} else {
return false, err
}
return false, err
}
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
err = consumer.Seek(seekMsgID, true)
if err != nil {