mirror of https://github.com/milvus-io/milvus.git
enhance: [2.3]Try LatestMessageID when checkpoint unmarshal fails (#33159)
Cherry-pick from master pr: #33158 See also #33122 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/33215/head
parent
76b7c23a66
commit
e325ec0a35
|
@ -482,10 +482,15 @@ func (ms *mqMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPositi
|
||||||
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
|
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
|
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
|
||||||
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
|
// try to use latest message ID first
|
||||||
continue
|
messageID, err = consumer.GetLatestMsgID()
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
|
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
|
||||||
|
@ -853,11 +858,17 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
|
||||||
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
|
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
|
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
|
||||||
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
|
// try to use latest message ID first
|
||||||
return false, nil
|
seekMsgID, err = consumer.GetLatestMsgID()
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
|
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
|
||||||
err = consumer.Seek(seekMsgID, true)
|
err = consumer.Seek(seekMsgID, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue