Add log for mqtt msgstream seek (#17640)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/17646/head
Xiaofan 2022-06-20 15:18:13 +08:00 committed by GitHub
parent f2bd910df5
commit e36fce1fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 4 deletions

View File

@ -548,13 +548,13 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
return err
}
log.Debug("MsgStream begin to seek start msg: ", zap.Any("MessageID", messageID))
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", messageID))
err = consumer.Seek(messageID, false)
if err != nil {
log.Debug("Failed to seek", zap.Error(err))
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
return err
}
log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID))
log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName))
}
return nil
}
@ -673,7 +673,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
// block here until addConsumer
if _, ok := <-ms.syncConsumer; !ok {
log.Debug("consumer closed!")
log.Warn("consumer closed!")
return
}
@ -859,10 +859,13 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if err != nil {
return err
}
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", seekMsgID))
err = consumer.Seek(seekMsgID, true)
if err != nil {
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
return err
}
log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName))
return nil
}