Add msgstream seek (#5483)

* Add msgstream seek

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change test name

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* remove file

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* resolve conversation

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5516/head
godchen 2021-05-31 17:46:30 +08:00 committed by GitHub
parent 9e91860ae9
commit 918458a1be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 3 deletions

View File

@ -12,6 +12,7 @@
package msgstream
import (
"bytes"
"context"
"errors"
"fmt"
@ -128,8 +129,6 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
ms.consumers[channel] = pc
ms.consumerChannels = append(ms.consumerChannels, channel)
ms.consumerLock.Unlock()
ms.wait.Add(1)
go ms.receiveMsg(pc)
return nil
}
err := Retry(20, time.Millisecond*200, fn)
@ -145,6 +144,10 @@ func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
}
func (ms *mqMsgStream) Start() {
for _, c := range ms.consumers {
ms.wait.Add(1)
go ms.receiveMsg(c)
}
}
func (ms *mqMsgStream) Close() {
@ -351,7 +354,11 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
}
msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
msgPack := MsgPack{
Msgs: []TsMsg{tsMsg},
StartPositions: []*internalpb.MsgPosition{tsMsg.Position()},
EndPositions: []*internalpb.MsgPosition{tsMsg.Position()},
}
ms.receiveBuf <- &msgPack
sp.Finish()
@ -377,6 +384,18 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if err != nil {
return err
}
msg, ok := <-consumer.Chan()
if !ok {
return errors.New("consumer closed")
}
consumer.Ack(msg)
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
err = fmt.Errorf("seek msg not correct")
log.Error("msMsgStream seek", zap.Error(err))
}
return nil
}
return nil
}

View File

@ -881,6 +881,49 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) {
inputStream2.Close()
}
func TestStream_MqMsgStream_Seek(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
consumerSubName := funcutil.RandomString(8)
msgPack := &MsgPack{}
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
var seekPosition *internalpb.MsgPosition
for i := 0; i < 10; i++ {
result := outputStream.Consume()
assert.Equal(t, result.Msgs[0].ID(), int64(i))
if i == 5 {
seekPosition = result.EndPositions[0]
}
}
outputStream.Close()
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
outputStream2.Start()
for i := 6; i < 10; i++ {
result := outputStream2.Consume()
assert.Equal(t, result.Msgs[0].ID(), int64(i))
}
outputStream2.Close()
}
/****************************************Rmq test******************************************/
func initRmq(name string) *etcdkv.EtcdKV {