Add rmq consumer close (#7596)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/7608/head
godchen 2021-09-08 21:49:59 +08:00 committed by GitHub
parent d3a18a66b5
commit 3c2821e5e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 2 deletions

View File

@ -55,7 +55,7 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
rConsumer := &RmqConsumer{c: cli}
rConsumer := &RmqConsumer{c: cli, closeCh: make(chan struct{})}
return rConsumer, nil
}

View File

@ -18,6 +18,7 @@ import (
type RmqConsumer struct {
c rocksmq.Consumer
msgChannel chan ConsumerMessage
closeCh chan struct{}
}
func (rc *RmqConsumer) Subscription() string {
@ -25,7 +26,6 @@ func (rc *RmqConsumer) Subscription() string {
}
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
if rc.msgChannel == nil {
rc.msgChannel = make(chan ConsumerMessage)
go func() {
@ -37,6 +37,9 @@ func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
return
}
rc.msgChannel <- &rmqMessage{msg: msg}
case <-rc.closeCh:
close(rc.msgChannel)
return
}
}
}()
@ -53,4 +56,6 @@ func (rc *RmqConsumer) Ack(message ConsumerMessage) {
}
func (rc *RmqConsumer) Close() {
rc.c.Close()
close(rc.closeCh)
}