mirror of https://github.com/milvus-io/milvus.git
Fix pulsar consumer goroutine leakage (#7004)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/6965/head
parent
2f5d78a596
commit
9eb35996b5
|
@ -67,7 +67,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
}
|
||||
//consumer.Seek(pulsar.EarliestMessageID())
|
||||
//consumer.SeekByTime(time.Unix(0, 0))
|
||||
pConsumer := &pulsarConsumer{c: consumer}
|
||||
pConsumer := &pulsarConsumer{c: consumer, closeCh: make(chan struct{})}
|
||||
|
||||
return pConsumer, nil
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type pulsarConsumer struct {
|
|||
c pulsar.Consumer
|
||||
msgChannel chan ConsumerMessage
|
||||
hasSeek bool
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func (pc *pulsarConsumer) Subscription() string {
|
||||
|
@ -39,11 +40,13 @@ func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
|
|||
select {
|
||||
case msg, ok := <-pc.c.Chan():
|
||||
if !ok {
|
||||
close(pc.msgChannel)
|
||||
log.Debug("pulsar consumer channel closed")
|
||||
return
|
||||
}
|
||||
pc.msgChannel <- &pulsarMessage{msg: msg}
|
||||
case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed
|
||||
close(pc.msgChannel)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -67,4 +70,5 @@ func (pc *pulsarConsumer) Ack(message ConsumerMessage) {
|
|||
|
||||
func (pc *pulsarConsumer) Close() {
|
||||
pc.c.Close()
|
||||
close(pc.closeCh)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue