From e3f49858c5251e49d69fb70091b7e4de13174680 Mon Sep 17 00:00:00 2001 From: godchen Date: Wed, 24 Nov 2021 17:47:15 +0800 Subject: [PATCH] Add subscription prefix (#12251) Signed-off-by: godchen --- internal/datanode/flow_graph_dmstream_input_node_test.go | 2 +- internal/msgstream/mq_msgstream.go | 7 ++++--- internal/msgstream/mq_msgstream_test.go | 2 +- internal/msgstream/msgstream.go | 2 +- internal/proxy/mock_test.go | 2 +- internal/querynode/segment_loader.go | 2 +- internal/util/mqclient/pulsar_client.go | 1 + internal/util/mqclient/reader.go | 3 +++ internal/util/mqclient/rmq_client.go | 1 + internal/util/rocksmq/client/rocksmq/reader.go | 3 +++ internal/util/rocksmq/client/rocksmq/reader_impl.go | 4 +++- internal/util/rocksmq/server/rocksmq/rocksmq.go | 2 +- internal/util/rocksmq/server/rocksmq/rocksmq_impl.go | 4 ++-- internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go | 2 +- 14 files changed, 24 insertions(+), 13 deletions(-) diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index ce2e17e00b..0f8d13fbf2 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -67,7 +67,7 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack { } func (mtm *mockTtMsgStream) AsProducer(channels []string) {} -func (mtm *mockTtMsgStream) AsReader(channels []string) {} +func (mtm *mockTtMsgStream) AsReader(channels []string, subName string) {} func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {} func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) { } diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 059d3278b5..7d7ea3f8d4 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -167,7 +167,7 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, } // AsProducer create producer to send message to channels -func (ms *mqMsgStream) AsReader(channels []string) { +func (ms *mqMsgStream) AsReader(channels []string, subName string) { for _, channel := range channels { if len(channel) == 0 { log.Error("MsgStream asProducer's channel is a empty string") @@ -175,8 +175,9 @@ func (ms *mqMsgStream) AsReader(channels []string) { } fn := func() error { r, err := ms.client.CreateReader(mqclient.ReaderOptions{ - Topic: channel, - StartMessageID: ms.client.EarliestMessageID(), + Topic: channel, + StartMessageID: ms.client.EarliestMessageID(), + SubscriptionRolePrefix: subName, }) if err != nil { return err diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index aa94174024..88484ba814 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -1935,7 +1935,7 @@ func getPulsarReader(pulsarAddress string, consumerChannels []string) MsgStream factory := ProtoUDFactory{} pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream.AsReader(consumerChannels) + outputStream.AsReader(consumerChannels, "pulsar-reader-prefix-") return outputStream } diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 1a22fd6dbc..b49115d723 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -58,7 +58,7 @@ type MsgStream interface { Chan() <-chan *MsgPack AsProducer(channels []string) AsConsumer(channels []string, subName string) - AsReader(channels []string) + AsReader(channels []string, subName string) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) SetRepackFunc(repackFunc RepackFunc) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 320680c6ff..0514524330 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -278,7 +278,7 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) { func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) { } -func (ms *simpleMockMsgStream) AsReader(channels []string) { +func (ms *simpleMockMsgStream) AsReader(channels []string, subName string) { } func (ms *simpleMockMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error { diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 1713feff12..515ef76976 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -449,7 +449,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection } pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName) position.ChannelName = pChannelName - stream.AsReader([]string{pChannelName}) + stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeID, collectionID)) stream.SeekReaders([]*internalpb.MsgPosition{position}) delData := &deleteData{ diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index 34ff76ee3f..10b85ad8b3 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -60,6 +60,7 @@ func (pc *pulsarClient) CreateReader(options ReaderOptions) (Reader, error) { Topic: options.Topic, StartMessageID: options.StartMessageID.(*pulsarID).messageID, StartMessageIDInclusive: options.StartMessageIDInclusive, + SubscriptionRolePrefix: options.SubscriptionRolePrefix, } pr, err := pc.client.CreateReader(opts) if err != nil { diff --git a/internal/util/mqclient/reader.go b/internal/util/mqclient/reader.go index 1e17cf1b4a..7aacf7fc93 100644 --- a/internal/util/mqclient/reader.go +++ b/internal/util/mqclient/reader.go @@ -32,6 +32,9 @@ type ReaderOptions struct { // If true, the reader will start at the `StartMessageID`, included. // Default is `false` and the reader will start from the "next" message StartMessageIDInclusive bool + + // SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader". + SubscriptionRolePrefix string } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/internal/util/mqclient/rmq_client.go b/internal/util/mqclient/rmq_client.go index ac8ee26d3c..fc94e3e28c 100644 --- a/internal/util/mqclient/rmq_client.go +++ b/internal/util/mqclient/rmq_client.go @@ -52,6 +52,7 @@ func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) { Topic: options.Topic, StartMessageID: options.StartMessageID.(*rmqID).messageID, StartMessageIDInclusive: options.StartMessageIDInclusive, + SubscriptionRolePrefix: options.SubscriptionRolePrefix, } pr, err := rc.client.CreateReader(opts) if err != nil { diff --git a/internal/util/rocksmq/client/rocksmq/reader.go b/internal/util/rocksmq/client/rocksmq/reader.go index a183d53b40..c000e263f2 100644 --- a/internal/util/rocksmq/client/rocksmq/reader.go +++ b/internal/util/rocksmq/client/rocksmq/reader.go @@ -43,6 +43,9 @@ type ReaderOptions struct { // If true, the reader will start at the `StartMessageID`, included. // Default is `false` and the reader will start from the "next" message StartMessageIDInclusive bool + + // SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader". + SubscriptionRolePrefix string } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/internal/util/rocksmq/client/rocksmq/reader_impl.go b/internal/util/rocksmq/client/rocksmq/reader_impl.go index 3da1d58f1f..00fbd599b2 100644 --- a/internal/util/rocksmq/client/rocksmq/reader_impl.go +++ b/internal/util/rocksmq/client/rocksmq/reader_impl.go @@ -19,6 +19,7 @@ type reader struct { name string startMessageID UniqueID startMessageIDInclusive bool + subscriptionRolePrefix string } func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) { @@ -37,11 +38,12 @@ func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) { name: readerOptions.Name, startMessageID: readerOptions.StartMessageID, startMessageIDInclusive: readerOptions.StartMessageIDInclusive, + subscriptionRolePrefix: readerOptions.SubscriptionRolePrefix, } if c.server == nil { return nil, newError(InvalidConfiguration, "rmq server in client is nil") } - name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive) + name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive, reader.subscriptionRolePrefix) if err != nil { return nil, err } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index ae106e2135..89e7faa842 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -50,7 +50,7 @@ type RocksMQ interface { Notify(topicName, groupName string) - CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) + CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) ReaderSeek(topicName string, readerName string, msgID UniqueID) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) HasNext(topicName string, readerName string, messageIDInclusive bool) bool diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 8a197bbe4e..e99d2cec02 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -932,7 +932,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) return nil } -func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) { +func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) { readOpts := gorocksdb.NewDefaultReadOptions() readOpts.SetPrefixSameAsStart(true) iter := rmq.store.NewIterator(readOpts) @@ -950,7 +950,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI if err != nil { return "", errors.New("Can't get current ts from rocksmq idAllocator") } - readerName := ReaderNamePrefix + strconv.FormatInt(nowTs, 10) + readerName := subscriptionRolePrefix + ReaderNamePrefix + strconv.FormatInt(nowTs, 10) reader := &rocksmqReader{ store: rmq.store, diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 82a08d0341..c8497734d0 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -734,7 +734,7 @@ func TestRocksmq_Reader(t *testing.T) { defer rmq.DestroyTopic(channelName) loopNum := 100 - readerName, err := rmq.CreateReader(channelName, 0, true) + readerName, err := rmq.CreateReader(channelName, 0, true, "test-sub-name") assert.NoError(t, err) pMsgs := make([]ProducerMessage, loopNum)