Add msgstream reader (#12130)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/12130/merge
godchen 2021-11-19 15:57:12 +08:00 committed by GitHub
parent 7e91bcf115
commit 51b353b52c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 187 additions and 3 deletions

View File

@ -67,6 +67,7 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsReader(channels []string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
}
@ -96,6 +97,13 @@ func (mtm *mockTtMsgStream) Consume() *msgstream.MsgPack {
func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error {
return nil
}
func (mtm *mockTtMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
return nil
}
func (mtm *mockTtMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
return nil, nil
}
func TestNewDmInputNode(t *testing.T) {
ctx := context.Background()

View File

@ -46,6 +46,8 @@ type mqMsgStream struct {
producerChannels []string
consumers map[string]mqclient.Consumer
consumerChannels []string
readers map[string]mqclient.Reader
readerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
@ -54,6 +56,7 @@ type mqMsgStream struct {
bufSize int64
producerLock *sync.Mutex
consumerLock *sync.Mutex
readerLock *sync.Mutex
}
// NewMqMsgStream is used to generate a new mqMsgStream object
@ -66,8 +69,10 @@ func NewMqMsgStream(ctx context.Context,
streamCtx, streamCancel := context.WithCancel(ctx)
producers := make(map[string]mqclient.Producer)
consumers := make(map[string]mqclient.Consumer)
readers := make(map[string]mqclient.Reader)
producerChannels := make([]string, 0)
consumerChannels := make([]string, 0)
readerChannels := make([]string, 0)
receiveBuf := make(chan *MsgPack, receiveBufSize)
stream := &mqMsgStream{
@ -77,12 +82,15 @@ func NewMqMsgStream(ctx context.Context,
producerChannels: producerChannels,
consumers: consumers,
consumerChannels: consumerChannels,
readers: readers,
readerChannels: readerChannels,
unmarshal: unmarshal,
bufSize: bufSize,
receiveBuf: receiveBuf,
streamCancel: streamCancel,
producerLock: &sync.Mutex{},
consumerLock: &sync.Mutex{},
readerLock: &sync.Mutex{},
wait: &sync.WaitGroup{},
}
@ -160,6 +168,39 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
}
}
// AsProducer create producer to send message to channels
func (ms *mqMsgStream) AsReader(channels []string) {
for _, channel := range channels {
if len(channel) == 0 {
log.Error("MsgStream asProducer's channel is a empty string")
break
}
fn := func() error {
r, err := ms.client.CreateReader(mqclient.ReaderOptions{
Topic: channel,
StartMessageID: ms.client.EarliestMessageID(),
})
if err != nil {
return err
}
if r == nil {
return errors.New("reader is nil")
}
ms.readerLock.Lock()
defer ms.readerLock.Unlock()
ms.readers[channel] = r
ms.readerChannels = append(ms.readerChannels, channel)
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
if err != nil {
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
@ -510,6 +551,54 @@ func (ms *mqMsgStream) Chan() <-chan *MsgPack {
return ms.receiveBuf
}
func (ms *mqMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
for _, mp := range msgPositions {
reader, ok := ms.readers[mp.ChannelName]
if !ok {
return fmt.Errorf("channel %s not subscribed", mp.ChannelName)
}
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
return err
}
log.Debug("MsgStream reader begin to seek", zap.Any("MessageID", mp.MsgID))
err = reader.Seek(messageID)
if err != nil {
log.Debug("Failed to seek", zap.Error(err))
return err
}
}
return nil
}
func (ms *mqMsgStream) Next(ctx context.Context, channelName string) (TsMsg, error) {
reader, ok := ms.readers[channelName]
if !ok {
return nil, fmt.Errorf("reader for channel %s is not exist", channelName)
}
if reader.HasNext() {
msg, err := reader.Next(ctx)
if err != nil {
return nil, err
}
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
if err != nil {
log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
return nil, errors.New("Failed to getTsMsgFromConsumerMsg")
}
pos := tsMsg.Position()
tsMsg.SetPosition(&MsgPosition{
ChannelName: pos.ChannelName,
MsgID: pos.MsgID,
Timestamp: tsMsg.BeginTs(),
})
return tsMsg, nil
}
log.Debug("All data has been read, there is no more data", zap.String("channel", channelName))
return nil, nil
}
// Seek reset the subscription associated with this consumer to a specific position
// User has to ensure mq_msgstream is not closed before seek, and the seek position is already written.
func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
@ -671,6 +760,11 @@ func (ms *MqTtMsgStream) Close() {
consumer.Close()
}
}
for _, reader := range ms.readers {
if reader != nil {
reader.Close()
}
}
}
func (ms *MqTtMsgStream) bufMsgPackToChannel() {

View File

@ -1148,6 +1148,58 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
outputStream2.Close()
}
func TestStream_MqMsgStream_Reader(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
readerChannels := []string{c}
msgPack := &MsgPack{}
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
defer inputStream.Close()
n := 10
p := 5
for i := 0; i < n; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
readStream := getPulsarReader(pulsarAddress, readerChannels)
defer readStream.Close()
var seekPosition *internalpb.MsgPosition
for i := 0; i < n; i++ {
result, err := readStream.Next(ctx, c)
assert.Nil(t, err)
assert.Equal(t, result.ID(), int64(i))
if i == p {
seekPosition = result.Position()
}
}
result, err := readStream.Next(ctx, c)
assert.Nil(t, err)
assert.Nil(t, result)
readStream2 := getPulsarReader(pulsarAddress, readerChannels)
defer readStream2.Close()
readStream2.SeekReaders([]*internalpb.MsgPosition{seekPosition})
for i := p; i < 10; i++ {
result, err := readStream2.Next(ctx, c)
assert.Nil(t, err)
assert.Equal(t, result.ID(), int64(i))
}
result2, err := readStream2.Next(ctx, c)
assert.Nil(t, err)
assert.Nil(t, result2)
}
/****************************************Rmq test******************************************/
func initRmq(name string) *etcdkv.EtcdKV {
@ -1645,6 +1697,14 @@ func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, cons
return outputStream
}
func getPulsarReader(pulsarAddress string, consumerChannels []string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsReader(consumerChannels)
return outputStream
}
func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})

View File

@ -58,6 +58,7 @@ type MsgStream interface {
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
AsReader(channels []string)
AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition)
SetRepackFunc(repackFunc RepackFunc)
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
@ -67,7 +68,9 @@ type MsgStream interface {
Broadcast(*MsgPack) error
BroadcastMark(*MsgPack) (map[string][]MessageID, error)
Consume() *MsgPack
Next(ctx context.Context, channelName string) (TsMsg, error)
Seek(offset []*MsgPosition) error
SeekReaders(msgPositions []*internalpb.MsgPosition) error
}
// Factory is an interface that can be used to generate a new msgstream object

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -277,6 +278,16 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) {
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) AsReader(channels []string) {
}
func (ms *simpleMockMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
return nil
}
func (ms *simpleMockMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
return nil, nil
}
func (ms *simpleMockMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
}

View File

@ -13,6 +13,9 @@ package mqclient
// Client is the interface that provides operations of message queues
type Client interface {
// Create a producer instance
CreateReader(options ReaderOptions) (Reader, error)
// Create a producer instance
CreateProducer(options ProducerOptions) (Producer, error)

View File

@ -58,7 +58,7 @@ func (pc *pulsarClient) CreateProducer(options ProducerOptions) (Producer, error
func (pc *pulsarClient) CreateReader(options ReaderOptions) (Reader, error) {
opts := pulsar.ReaderOptions{
Topic: options.Topic,
StartMessageID: options.StartMessageID,
StartMessageID: options.StartMessageID.(*pulsarID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
}
pr, err := pc.client.CreateReader(opts)

View File

@ -55,7 +55,7 @@ func TestPulsarReader(t *testing.T) {
reader, err := pc.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: pulsar.EarliestMessageID(),
StartMessageID: pc.EarliestMessageID(),
})
assert.Nil(t, err)
assert.NotNil(t, reader)
@ -87,7 +87,7 @@ func TestPulsarReader(t *testing.T) {
readerOfSeek, err := pc.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: pulsar.EarliestMessageID(),
StartMessageID: pc.EarliestMessageID(),
})
assert.Nil(t, err)
defer readerOfSeek.Close()

View File

@ -45,6 +45,11 @@ func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
return &rp, nil
}
//TODO fishpenguin: implementation
func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) {
panic("this method has not been implented")
}
// Subscribe subscribes a consumer in rmq client
func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
receiveChannel := make(chan rocksmq.Message, options.BufSize)