diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 444f9dc55e..89971801f7 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -548,13 +548,13 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { return err } - log.Debug("MsgStream begin to seek start msg: ", zap.Any("MessageID", messageID)) + log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", messageID)) err = consumer.Seek(messageID, false) if err != nil { - log.Debug("Failed to seek", zap.Error(err)) + log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) return err } - log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID)) + log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) } return nil } @@ -673,7 +673,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { // block here until addConsumer if _, ok := <-ms.syncConsumer; !ok { - log.Debug("consumer closed!") + log.Warn("consumer closed!") return } @@ -859,10 +859,13 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if err != nil { return err } + log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", seekMsgID)) err = consumer.Seek(seekMsgID, true) if err != nil { + log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) return err } + log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) return nil }