diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 41f6b260de..86ad3f7dfe 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -482,10 +482,15 @@ func (ms *mqMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPositi messageID, err := ms.client.BytesToMsgID(mp.MsgID) if err != nil { if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() { - log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) - continue + // try to use latest message ID first + messageID, err = consumer.GetLatestMsgID() + if err != nil { + log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) + continue + } + } else { + return err } - return err } log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) @@ -853,11 +858,17 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID) if err != nil { if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() { - log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) - return false, nil + // try to use latest message ID first + seekMsgID, err = consumer.GetLatestMsgID() + if err != nil { + log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) + return false, nil + } + } else { + return false, err } - return false, err } + log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) err = consumer.Seek(seekMsgID, true) if err != nil {