Fix mqconsumer data race (#8752)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/8770/head
congqixia 2021-09-28 17:04:04 +08:00 committed by GitHub
parent d374ea71ba
commit 01b4253a74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 38 deletions

View File

@ -12,6 +12,7 @@
package mqclient
import (
"sync"
"unsafe"
"github.com/apache/pulsar-client-go/pulsar"
@ -23,6 +24,7 @@ type pulsarConsumer struct {
msgChannel chan ConsumerMessage
hasSeek bool
closeCh chan struct{}
once sync.Once
}
func (pc *pulsarConsumer) Subscription() string {
@ -31,35 +33,37 @@ func (pc *pulsarConsumer) Subscription() string {
func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
if pc.msgChannel == nil {
pc.msgChannel = make(chan ConsumerMessage)
// this part handles msgstream expectation when the consumer is not seeked
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
// yet, our message stream is to setting to the very start point of the topic
if !pc.hasSeek {
// the concrete value of the MessageID is pulsar.messageID{-1,-1,-1,-1}
// but Seek function logic does not allow partitionID -1, See line 618-620 of github.com/apache/pulsar-client-go@v0.5.0 pulsar/consumer_impl.go
mid := pulsar.EarliestMessageID()
// the patch function use unsafe pointer to set partitionIdx to 0, which is the valid default partition index of current use case
// NOTE: when pulsar client version check, do check this logic is fixed or offset is changed!!!
// NOTE: unsafe solution, check implementation asap
patchEarliestMessageID(&mid)
pc.c.Seek(mid)
}
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-pc.c.Chan():
if !ok {
log.Debug("pulsar consumer channel closed")
pc.once.Do(func() {
pc.msgChannel = make(chan ConsumerMessage)
// this part handles msgstream expectation when the consumer is not seeked
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
// yet, our message stream is to setting to the very start point of the topic
if !pc.hasSeek {
// the concrete value of the MessageID is pulsar.messageID{-1,-1,-1,-1}
// but Seek function logic does not allow partitionID -1, See line 618-620 of github.com/apache/pulsar-client-go@v0.5.0 pulsar/consumer_impl.go
mid := pulsar.EarliestMessageID()
// the patch function use unsafe pointer to set partitionIdx to 0, which is the valid default partition index of current use case
// NOTE: when pulsar client version check, do check this logic is fixed or offset is changed!!!
// NOTE: unsafe solution, check implementation asap
patchEarliestMessageID(&mid)
pc.c.Seek(mid)
}
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-pc.c.Chan():
if !ok {
log.Debug("pulsar consumer channel closed")
return
}
pc.msgChannel <- &pulsarMessage{msg: msg}
case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed
close(pc.msgChannel)
return
}
pc.msgChannel <- &pulsarMessage{msg: msg}
case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed
close(pc.msgChannel)
return
}
}
}()
}()
})
}
return pc.msgChannel
}

View File

@ -12,6 +12,8 @@
package mqclient
import (
"sync"
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
)
@ -19,6 +21,7 @@ type RmqConsumer struct {
c rocksmq.Consumer
msgChannel chan ConsumerMessage
closeCh chan struct{}
once sync.Once
}
func (rc *RmqConsumer) Subscription() string {
@ -27,22 +30,24 @@ func (rc *RmqConsumer) Subscription() string {
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
if rc.msgChannel == nil {
rc.msgChannel = make(chan ConsumerMessage)
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-rc.c.Chan():
if !ok {
rc.once.Do(func() {
rc.msgChannel = make(chan ConsumerMessage)
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-rc.c.Chan():
if !ok {
close(rc.msgChannel)
return
}
rc.msgChannel <- &rmqMessage{msg: msg}
case <-rc.closeCh:
close(rc.msgChannel)
return
}
rc.msgChannel <- &rmqMessage{msg: msg}
case <-rc.closeCh:
close(rc.msgChannel)
return
}
}
}()
}()
})
}
return rc.msgChannel
}