Fix Pulsar seek to latest may block forever (#11128)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/11314/head
Xiaofan 2021-11-05 14:55:44 +08:00 committed by GitHub
parent eac24979ff
commit a4715996ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 132 additions and 15 deletions

View File

@ -511,6 +511,7 @@ func (ms *mqMsgStream) Chan() <-chan *MsgPack {
}
// Seek reset the subscription associated with this consumer to a specific position
// User has to ensure mq_msgstream is not closed before seek, and the seek position is already written.
func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
for _, mp := range msgPositions {
consumer, ok := ms.consumers[mp.ChannelName]
@ -521,19 +522,20 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if err != nil {
return err
}
log.Debug("MsgStream begin to seek", zap.Any("MessageID", messageID))
log.Debug("MsgStream begin to seek", zap.Any("MessageID", mp.MsgID))
err = consumer.Seek(messageID)
if err != nil {
log.Debug("Failed to seek", zap.Error(err))
return err
}
log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID))
if _, ok := consumer.(*mqclient.RmqConsumer); !ok {
log.Debug("MsgStream begin to read one message after seek")
if _, ok := consumer.(*mqclient.PulsarConsumer); ok {
log.Debug("MsgStream start to pop one message after seek")
msg, ok := <-consumer.Chan()
if !ok {
return errors.New("consumer closed")
}
log.Debug("MsgStream finish reading one message after seek")
log.Debug("MsgStream finish to pop one message after seek")
consumer.Ack(msg)
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
err = fmt.Errorf("seek msg not correct")

View File

@ -27,6 +27,7 @@ import (
"sync"
"testing"
"time"
"unsafe"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
@ -322,7 +323,7 @@ func TestMqMsgStream_Chan(t *testing.T) {
}
}
func TestMqMsgStream_Seek(t *testing.T) {
func TestMqMsgStream_SeekNotSubscribed(t *testing.T) {
f := &fixture{t: t}
parameters := f.setup()
defer f.teardown()
@ -1055,6 +1056,98 @@ func TestStream_MqMsgStream_Seek(t *testing.T) {
}
func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
consumerSubName := funcutil.RandomString(8)
msgPack := &MsgPack{}
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
var seekPosition *internalpb.MsgPosition
for i := 0; i < 10; i++ {
result := outputStream.Consume()
assert.Equal(t, result.Msgs[0].ID(), int64(i))
seekPosition = result.EndPositions[0]
}
outputStream.Close()
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
messageID, _ := pulsar.DeserializeMessageID(seekPosition.MsgID)
// try to seek to not written position
patchMessageID(&messageID, 11)
p := []*internalpb.MsgPosition{
{
ChannelName: seekPosition.ChannelName,
Timestamp: seekPosition.Timestamp,
MsgGroup: seekPosition.MsgGroup,
MsgID: messageID.Serialize(),
},
}
go func() {
time.Sleep(1 * time.Second)
outputStream2.Close()
}()
err = outputStream2.Seek(p)
assert.Error(t, err)
}
func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
consumerSubName := funcutil.RandomString(8)
msgPack := &MsgPack{}
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumerWithPosition(consumerChannels, consumerSubName, mqclient.SubscriptionPositionLatest)
outputStream2.Start()
msgPack.Msgs = nil
// produce another 10 tsMs
for i := 10; i < 20; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err = inputStream.Produce(msgPack)
assert.Nil(t, err)
for i := 10; i < 20; i++ {
result := outputStream2.Consume()
assert.Equal(t, result.Msgs[0].ID(), int64(i))
}
outputStream2.Close()
}
/****************************************Rmq test******************************************/
func initRmq(name string) *etcdkv.EtcdKV {
@ -1642,3 +1735,24 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
Close(rocksdbName, inputStream, outputStream, etcdKV)
}
func patchMessageID(mid *pulsar.MessageID, entryID int64) {
// use direct unsafe conversion
/* #nosec G103 */
r := (*iface)(unsafe.Pointer(mid))
id := (*messageID)(r.Data)
id.entryID = entryID
}
// unsafe access pointer, same as pulsar.messageID
type messageID struct {
ledgerID int64
entryID int64
batchID int32
partitionIdx int32
}
// interface struct mapping
type iface struct {
Type, Data unsafe.Pointer
}

View File

@ -68,10 +68,10 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
pConsumer := &pulsarConsumer{c: consumer, closeCh: make(chan struct{})}
pConsumer := &PulsarConsumer{c: consumer, closeCh: make(chan struct{})}
// prevent seek to earliest patch applied when using latest position options
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
pConsumer.hasSeek = true
pConsumer.AtLatest = true
}
return pConsumer, nil

View File

@ -19,26 +19,27 @@ import (
"github.com/milvus-io/milvus/internal/log"
)
type pulsarConsumer struct {
type PulsarConsumer struct {
c pulsar.Consumer
msgChannel chan ConsumerMessage
hasSeek bool
AtLatest bool
closeCh chan struct{}
once sync.Once
}
func (pc *pulsarConsumer) Subscription() string {
func (pc *PulsarConsumer) Subscription() string {
return pc.c.Subscription()
}
func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
func (pc *PulsarConsumer) Chan() <-chan ConsumerMessage {
if pc.msgChannel == nil {
pc.once.Do(func() {
pc.msgChannel = make(chan ConsumerMessage, 256)
// this part handles msgstream expectation when the consumer is not seeked
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
// yet, our message stream is to setting to the very start point of the topic
if !pc.hasSeek {
if !pc.hasSeek && !pc.AtLatest {
// the concrete value of the MessageID is pulsar.messageID{-1,-1,-1,-1}
// but Seek function logic does not allow partitionID -1, See line 618-620 of github.com/apache/pulsar-client-go@v0.5.0 pulsar/consumer_impl.go
mid := pulsar.EarliestMessageID()
@ -70,7 +71,7 @@ func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
// Seek seek consume position to the pointed messageID,
// the pointed messageID will be consumed after the seek in pulsar
func (pc *pulsarConsumer) Seek(id MessageID) error {
func (pc *PulsarConsumer) Seek(id MessageID) error {
messageID := id.(*pulsarID).messageID
err := pc.c.Seek(messageID)
if err == nil {
@ -79,12 +80,12 @@ func (pc *pulsarConsumer) Seek(id MessageID) error {
return err
}
func (pc *pulsarConsumer) Ack(message ConsumerMessage) {
func (pc *PulsarConsumer) Ack(message ConsumerMessage) {
pm := message.(*pulsarMessage)
pc.c.Ack(pm.msg)
}
func (pc *pulsarConsumer) Close() {
func (pc *PulsarConsumer) Close() {
pc.c.Close()
close(pc.closeCh)
}

View File

@ -704,7 +704,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := groupName + "/" + topicName + "/current_id"
key := constructCurrentID(topicName, groupName)
if !rmq.checkKeyExist(key) {
log.Debug("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)