Rocksmq support SubscriptionPositionLatest (#8947)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/8984/head
yukun 2021-09-30 20:51:40 +08:00 committed by GitHub
parent 6b74c7f79f
commit c486007796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 147 additions and 29 deletions

View File

@ -47,9 +47,10 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize)
cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{
Topic: options.Topic,
SubscriptionName: options.SubscriptionName,
MessageChannel: receiveChannel,
Topic: options.Topic,
SubscriptionName: options.SubscriptionName,
MessageChannel: receiveChannel,
SubscriptionInitialPosition: rocksmq.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
})
if err != nil {
return nil, err

View File

@ -94,9 +94,10 @@ func TestRmqClient_Subscribe(t *testing.T) {
subName := "subName"
consumerOpts := ConsumerOptions{
Topic: "",
SubscriptionName: subName,
BufSize: 1024,
Topic: "",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
BufSize: 1024,
}
consumer, err := client.Subscribe(consumerOpts)

View File

@ -76,10 +76,16 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}
if exist, con := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName); exist {
log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.Any("SubscriptionName", options.SubscriptionName))
consumer, err := newConsumer1(c, options, con.MsgMutex)
consumer, err := getExistedConsumer(c, options, con.MsgMutex)
if err != nil {
return nil, err
}
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
}
}
c.wg.Add(1)
go c.consume(consumer)
return consumer, nil
@ -95,6 +101,12 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
}
}
// Register self in rocksmq server
cons := &server.Consumer{
Topic: consumer.topic,

View File

@ -77,8 +77,9 @@ func TestClient_Subscribe(t *testing.T) {
assert.NoError(t, err)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Error(t, err)
assert.Nil(t, consumer)
@ -93,8 +94,9 @@ func TestClient_Subscribe(t *testing.T) {
assert.NoError(t, err)
defer client1.Close()
opt := ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
}
consumer1, err := client1.Subscribe(opt)
assert.NoError(t, err)
@ -103,6 +105,18 @@ func TestClient_Subscribe(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, consumer2)
opt1 := ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionLatest,
}
consumer3, err := client1.Subscribe(opt1)
assert.Error(t, err)
assert.Nil(t, consumer3)
consumer4, err := client1.Subscribe(opt1)
assert.Error(t, err)
assert.Nil(t, consumer4)
producer1, err := client1.CreateProducer(ProducerOptions{
Topic: newTopicName(),
})
@ -130,8 +144,9 @@ func TestClient_consume(t *testing.T) {
assert.NoError(t, err)
opt := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
Topic: topicName,
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
}
consumer, err := client.Subscribe(opt)
assert.NoError(t, err)

View File

@ -54,7 +54,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
}, nil
}
func newConsumer1(c *client, options ConsumerOptions, msgMutex chan struct{}) (*consumer, error) {
func getExistedConsumer(c *client, options ConsumerOptions, msgMutex chan struct{}) (*consumer, error) {
if c == nil {
return nil, newError(InvalidConfiguration, "client is nil")
}

View File

@ -23,7 +23,7 @@ func TestConsumer_newConsumer(t *testing.T) {
consumer, err := newConsumer(nil, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionLatest,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, consumer)
assert.NotNil(t, err)
@ -34,7 +34,7 @@ func TestConsumer_newConsumer(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
consumer, err = newConsumer1(newMockClient(), ConsumerOptions{}, nil)
consumer, err = getExistedConsumer(newMockClient(), ConsumerOptions{}, nil)
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
@ -58,8 +58,9 @@ func TestConsumer_newConsumer(t *testing.T) {
defer client.Close()
consumerName := newConsumerName()
consumer1, err := newConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: consumerName,
Topic: newTopicName(),
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.NoError(t, err)
assert.NotNil(t, consumer1)
@ -79,22 +80,24 @@ func TestConsumer_newConsumer(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, consumer3)
consumer4, err := newConsumer1(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
consumer4, err := getExistedConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
}, nil)
assert.NoError(t, err)
assert.NotNil(t, consumer4)
consumer5, err := newConsumer1(client, ConsumerOptions{
consumer5, err := getExistedConsumer(client, ConsumerOptions{
Topic: "",
}, nil)
assert.Error(t, err)
assert.Nil(t, consumer5)
consumer6, err := newConsumer1(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "",
consumer6, err := getExistedConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
}, nil)
assert.Error(t, err)
assert.Nil(t, consumer6)
@ -104,8 +107,9 @@ func TestConsumer_Subscription(t *testing.T) {
topicName := newTopicName()
consumerName := newConsumerName()
consumer, err := newConsumer(newMockClient(), ConsumerOptions{
Topic: topicName,
SubscriptionName: consumerName,
Topic: topicName,
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, consumer)
assert.NotNil(t, err)
@ -126,8 +130,9 @@ func TestConsumer_Seek(t *testing.T) {
topicName := newTopicName()
consumerName := newConsumerName()
consumer, err := newConsumer(client, ConsumerOptions{
Topic: topicName,
SubscriptionName: consumerName,
Topic: topicName,
SubscriptionName: consumerName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.NoError(t, err)
assert.NotNil(t, consumer)

View File

@ -43,6 +43,7 @@ type RocksMQ interface {
Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error)
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
Seek(topicName string, groupName string, msgID UniqueID) error
SeekToLatest(topicName, groupName string) error
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)
Notify(topicName, groupName string)

View File

@ -653,6 +653,37 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
return nil
}
func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := groupName + "/" + topicName + "/current_id"
if !rmq.checkKeyExist(key) {
log.Debug("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
iter := rmq.store.NewIterator(readOpts)
defer iter.Close()
fixChanName, _ := fixChannelName(topicName)
iter.Seek([]byte(fixChanName + "/"))
if iter.Valid() {
iter.SeekToLast()
} else {
return fmt.Errorf("RocksMQ: can't get message key of channel %s", topicName)
}
msgKey := iter.Key()
msgID, err := strconv.ParseInt(string(msgKey.Data())[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}
err = rmq.kv.Save(key, strconv.FormatInt(msgID, 10))
return err
}
func (rmq *rocksmq) Notify(topicName, groupName string) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {

View File

@ -552,3 +552,55 @@ func TestRocksmq_CopyData(t *testing.T) {
assert.Equal(t, 3, len(cMsgs1))
assert.Equal(t, emptyTargetData, cMsgs1[0].Payload)
}
func TestRocksmq_SeekToLatest(t *testing.T) {
ep := etcdEndpoints()
etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_seektolatest"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
channelName := "channel_test"
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
loopNum := 100
err = rmq.SeekToLatest(channelName, "dummy_group")
assert.Error(t, err)
// Consume loopNum message once
groupName := "group_test"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
assert.Nil(t, err)
err = rmq.SeekToLatest(channelName, groupName)
assert.Error(t, err)
pMsgs := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i+loopNum)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
err = rmq.SeekToLatest(channelName, groupName)
assert.Nil(t, err)
cMsgs, err := rmq.Consume(channelName, groupName, loopNum)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 0)
}