diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 0f6d014c2e..61fc8ff25c 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -432,6 +432,7 @@ func (ms *mqMsgStream) Chan() <-chan *MsgPack { return ms.receiveBuf } +// Seek reset the subscription associated with this consumer to a specific position func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { for _, mp := range msgPositions { consumer, ok := ms.consumers[mp.ChannelName]