mirror of https://github.com/milvus-io/milvus.git
parent
b718bcb1c7
commit
d3027c0d28
|
@ -96,9 +96,9 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
|
|||
}
|
||||
|
||||
ms.producerLock.Lock()
|
||||
defer ms.producerLock.Unlock()
|
||||
ms.producers[channel] = pp
|
||||
ms.producerChannels = append(ms.producerChannels, channel)
|
||||
ms.producerLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
|
||||
|
@ -131,9 +131,9 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
|
|||
}
|
||||
|
||||
ms.consumerLock.Lock()
|
||||
defer ms.consumerLock.Unlock()
|
||||
ms.consumers[channel] = pc
|
||||
ms.consumerChannels = append(ms.consumerChannels, channel)
|
||||
ms.consumerLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
|
||||
|
@ -243,15 +243,18 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
|||
|
||||
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
if err := ms.producers[channel].Send(
|
||||
spanCtx,
|
||||
msg,
|
||||
); err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
trace.LogError(sp, err)
|
||||
sp.Finish()
|
||||
return err
|
||||
}
|
||||
sp.Finish()
|
||||
ms.producerLock.Unlock()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -285,6 +288,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
|
|||
spanCtx,
|
||||
msg,
|
||||
); err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
trace.LogError(sp, err)
|
||||
sp.Finish()
|
||||
return err
|
||||
|
@ -494,8 +498,8 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
|
|||
}
|
||||
|
||||
ms.consumerLock.Lock()
|
||||
defer ms.consumerLock.Unlock()
|
||||
ms.addConsumer(pc, channel)
|
||||
ms.consumerLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
|
||||
|
|
Loading…
Reference in New Issue