Fix rocksmq SeekToLatest (#11029)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/11039/head
congqixia 2021-11-01 19:35:51 +08:00 committed by GitHub
parent a7397a90e7
commit be0daafdb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 3 deletions

View File

@ -1619,3 +1619,42 @@ func printMsgPack(msgPack *MsgPack) {
}
log.Println("================")
}
func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
producerChannels := []string{"insert1"}
consumerChannels := []string{"insert1"}
consumerSubName := "subInsert"
rocksdbName := "/tmp/rocksmq_asconsumer_withpos"
etcdKV := initRmq(rocksdbName)
factory := ProtoUDFactory{}
rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
otherInputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
otherInputStream.AsProducer([]string{"root_timetick"})
otherInputStream.Start()
otherInputStream.Produce(getTimeTickMsgPack(999))
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
for i := 0; i < 100; i++ {
inputStream.Produce(getTimeTickMsgPack(int64(i)))
}
rmqClient2, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumerWithPosition(consumerChannels, consumerSubName, mqclient.SubscriptionPositionLatest)
outputStream.Start()
inputStream.Produce(getTimeTickMsgPack(1000))
pack := outputStream.Consume()
assert.NotNil(t, pack)
assert.Equal(t, 1, len(pack.Msgs))
assert.EqualValues(t, 1000, pack.Msgs[0].BeginTs())
Close(rocksdbName, inputStream, outputStream, etcdKV)
}

View File

@ -12,6 +12,7 @@
package rocksmq
import (
"bytes"
"errors"
"fmt"
"strconv"
@ -722,14 +723,30 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
fixChanName, _ := fixChannelName(topicName)
iter.Seek([]byte(fixChanName + "/"))
var last []byte
// iter.SeekToLast bypass prefix limitation
// use for range until find next prefix for now
if iter.Valid() {
iter.SeekToLast()
last = iter.Key().Data()
current := last
for bytes.HasPrefix(current, []byte(topicName)) {
iter.Next()
if iter.Valid() {
current = last
last = iter.Key().Data()
} else {
break
}
}
} else {
// In this case there are no messages, so shouldn't return error
return nil
}
msgKey := iter.Key()
msgID, err := strconv.ParseInt(string(msgKey.Data())[FixedChannelNameLen+1:], 10, 64)
if len(last) <= FixedChannelNameLen {
return nil
}
msgID, err := strconv.ParseInt(string(last)[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}