diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 62e48dc528..2268f9e6fc 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -225,6 +225,8 @@ func (d *Dispatcher) work() { } if err != nil { t.pos = pack.StartPositions[0] + // replace the pChannel with vChannel + t.pos.ChannelName = t.vchannel d.lagTargets.LoadOrStore(t.vchannel, t) d.nonBlockingNotify() delete(d.targets, vchannel)