Add subscription prefix (#12251)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/12258/head
godchen 2021-11-24 17:47:15 +08:00 committed by GitHub
parent 4088492c7c
commit e3f49858c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 24 additions and 13 deletions

View File

@ -67,7 +67,7 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsReader(channels []string) {}
func (mtm *mockTtMsgStream) AsReader(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
}

View File

@ -167,7 +167,7 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
}
// AsProducer create producer to send message to channels
func (ms *mqMsgStream) AsReader(channels []string) {
func (ms *mqMsgStream) AsReader(channels []string, subName string) {
for _, channel := range channels {
if len(channel) == 0 {
log.Error("MsgStream asProducer's channel is a empty string")
@ -175,8 +175,9 @@ func (ms *mqMsgStream) AsReader(channels []string) {
}
fn := func() error {
r, err := ms.client.CreateReader(mqclient.ReaderOptions{
Topic: channel,
StartMessageID: ms.client.EarliestMessageID(),
Topic: channel,
StartMessageID: ms.client.EarliestMessageID(),
SubscriptionRolePrefix: subName,
})
if err != nil {
return err

View File

@ -1935,7 +1935,7 @@ func getPulsarReader(pulsarAddress string, consumerChannels []string) MsgStream
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsReader(consumerChannels)
outputStream.AsReader(consumerChannels, "pulsar-reader-prefix-")
return outputStream
}

View File

@ -58,7 +58,7 @@ type MsgStream interface {
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
AsReader(channels []string)
AsReader(channels []string, subName string)
AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition)
SetRepackFunc(repackFunc RepackFunc)
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32

View File

@ -278,7 +278,7 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) {
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) AsReader(channels []string) {
func (ms *simpleMockMsgStream) AsReader(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {

View File

@ -449,7 +449,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
}
pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
stream.AsReader([]string{pChannelName})
stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeID, collectionID))
stream.SeekReaders([]*internalpb.MsgPosition{position})
delData := &deleteData{

View File

@ -60,6 +60,7 @@ func (pc *pulsarClient) CreateReader(options ReaderOptions) (Reader, error) {
Topic: options.Topic,
StartMessageID: options.StartMessageID.(*pulsarID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
SubscriptionRolePrefix: options.SubscriptionRolePrefix,
}
pr, err := pc.client.CreateReader(opts)
if err != nil {

View File

@ -32,6 +32,9 @@ type ReaderOptions struct {
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
}
// Reader can be used to scan through all the messages currently available in a topic.

View File

@ -52,6 +52,7 @@ func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) {
Topic: options.Topic,
StartMessageID: options.StartMessageID.(*rmqID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
SubscriptionRolePrefix: options.SubscriptionRolePrefix,
}
pr, err := rc.client.CreateReader(opts)
if err != nil {

View File

@ -43,6 +43,9 @@ type ReaderOptions struct {
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
}
// Reader can be used to scan through all the messages currently available in a topic.

View File

@ -19,6 +19,7 @@ type reader struct {
name string
startMessageID UniqueID
startMessageIDInclusive bool
subscriptionRolePrefix string
}
func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
@ -37,11 +38,12 @@ func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
name: readerOptions.Name,
startMessageID: readerOptions.StartMessageID,
startMessageIDInclusive: readerOptions.StartMessageIDInclusive,
subscriptionRolePrefix: readerOptions.SubscriptionRolePrefix,
}
if c.server == nil {
return nil, newError(InvalidConfiguration, "rmq server in client is nil")
}
name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive)
name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive, reader.subscriptionRolePrefix)
if err != nil {
return nil, err
}

View File

@ -50,7 +50,7 @@ type RocksMQ interface {
Notify(topicName, groupName string)
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error)
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error)
ReaderSeek(topicName string, readerName string, msgID UniqueID)
Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error)
HasNext(topicName string, readerName string, messageIDInclusive bool) bool

View File

@ -932,7 +932,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
return nil
}
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) {
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) {
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts.SetPrefixSameAsStart(true)
iter := rmq.store.NewIterator(readOpts)
@ -950,7 +950,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
if err != nil {
return "", errors.New("Can't get current ts from rocksmq idAllocator")
}
readerName := ReaderNamePrefix + strconv.FormatInt(nowTs, 10)
readerName := subscriptionRolePrefix + ReaderNamePrefix + strconv.FormatInt(nowTs, 10)
reader := &rocksmqReader{
store: rmq.store,

View File

@ -734,7 +734,7 @@ func TestRocksmq_Reader(t *testing.T) {
defer rmq.DestroyTopic(channelName)
loopNum := 100
readerName, err := rmq.CreateReader(channelName, 0, true)
readerName, err := rmq.CreateReader(channelName, 0, true, "test-sub-name")
assert.NoError(t, err)
pMsgs := make([]ProducerMessage, loopNum)