Add reader interface (#11973)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/11981/head
godchen 2021-11-17 14:11:11 +08:00 committed by GitHub
parent 53012efbe4
commit a1d1b3d0b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 139 additions and 31 deletions

View File

@ -131,7 +131,7 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
continue
}
fn := func() error {
receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
receiveChannel := make(chan mqclient.Message, ms.bufSize)
pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Topic: channel,
SubscriptionName: subName,
@ -443,7 +443,7 @@ func (ms *mqMsgStream) Consume() *MsgPack {
}
}
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg mqclient.ConsumerMessage) (TsMsg, error) {
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg mqclient.Message) (TsMsg, error) {
header := commonpb.MsgHeader{}
err := proto.Unmarshal(msg.Payload(), &header)
if err != nil {
@ -619,7 +619,7 @@ func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName strin
continue
}
fn := func() error {
receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
receiveChannel := make(chan mqclient.Message, ms.bufSize)
pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Topic: channel,
SubscriptionName: subName,

View File

@ -61,7 +61,7 @@ type ConsumerOptions struct {
// Message for this consumer
// When a message is received, it will be pushed to this channel for consumption
MessageChannel chan ConsumerMessage
MessageChannel chan Message
// Set receive channel size
BufSize int64
@ -77,13 +77,13 @@ type Consumer interface {
Subscription() string
// Message channel
Chan() <-chan ConsumerMessage
Chan() <-chan Message
// Seek to the uniqueID position
Seek(MessageID) error //nolint:govet
// Make sure that msg is received. Only used in pulsar
Ack(ConsumerMessage)
Ack(Message)
// ConsumeAfterSeek defines the behavior whether to consume after seeking is done
ConsumeAfterSeek() bool

View File

@ -12,7 +12,7 @@
package mqclient
// ConsumerMessage is the interface that provides operations of a consumer
type ConsumerMessage interface {
type Message interface {
// Topic get the topic from which this message originated from
Topic() string

View File

@ -62,7 +62,7 @@ func Produce(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
log.Info("Produce done")
}
func VerifyMessage(t *testing.T, msg ConsumerMessage) {
func VerifyMessage(t *testing.T, msg Message) {
pload := BytesToInt(msg.Payload())
log.Info("RECV", zap.Any("v", pload))
pm := msg.(*pulsarMessage)
@ -92,7 +92,7 @@ func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
rand.Seed(time.Now().UnixNano())
cnt := 1 + rand.Int()%5
var msg ConsumerMessage
var msg Message
for i := 0; i < cnt; i++ {
select {
case <-ctx.Done():

View File

@ -20,8 +20,9 @@ import (
)
type PulsarConsumer struct {
c pulsar.Consumer
msgChannel chan ConsumerMessage
c pulsar.Consumer
pulsar.Reader
msgChannel chan Message
hasSeek bool
AtLatest bool
closeCh chan struct{}
@ -32,10 +33,10 @@ func (pc *PulsarConsumer) Subscription() string {
return pc.c.Subscription()
}
func (pc *PulsarConsumer) Chan() <-chan ConsumerMessage {
func (pc *PulsarConsumer) Chan() <-chan Message {
if pc.msgChannel == nil {
pc.once.Do(func() {
pc.msgChannel = make(chan ConsumerMessage, 256)
pc.msgChannel = make(chan Message, 256)
// this part handles msgstream expectation when the consumer is not seeked
// pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked
// yet, our message stream is to setting to the very start point of the topic
@ -85,7 +86,7 @@ func (pc *PulsarConsumer) ConsumeAfterSeek() bool {
return true
}
func (pc *PulsarConsumer) Ack(message ConsumerMessage) {
func (pc *PulsarConsumer) Ack(message Message) {
pm := message.(*pulsarMessage)
pc.c.Ack(pm.msg)
}

View File

@ -16,10 +16,10 @@ import (
)
// Check pulsarMessage implements ConsumerMessage
var _ ConsumerMessage = (*pulsarMessage)(nil)
var _ Message = (*pulsarMessage)(nil)
type pulsarMessage struct {
msg pulsar.ConsumerMessage
msg pulsar.Message
}
func (pm *pulsarMessage) Topic() string {

View File

@ -0,0 +1,53 @@
package mqclient
import (
"context"
)
// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Reader
Message
}
// ReaderOptions abstraction Reader options to use.
type ReaderOptions struct {
// Topic specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
// Name set the reader name.
Name string
// Attach a set of application defined properties to the reader
// This properties will be visible in the topic stats
Properties map[string]string
// StartMessageID initial reader positioning is done by specifying a message id. The options are:
// * `MessageID` : Start reading from a particular message id, the reader will position itself on that
// specific position. The first message to be read will be the message next to the specified
// messageID
StartMessageID MessageID
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
}
// Reader can be used to scan through all the messages currently available in a topic.
type Reader interface {
// Topic from which this reader is reading from
Topic() string
// Next read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
// HasNext check if there is any message available to read from the current position
HasNext() bool
// Close the reader and stop the broker to push more messages
Close()
// Reset the subscription associated with this reader to a specific message id.
Seek(MessageID) error
}

View File

@ -47,7 +47,7 @@ func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
// Subscribe subscribes a consumer in rmq client
func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize)
receiveChannel := make(chan rocksmq.Message, options.BufSize)
cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{
Topic: options.Topic,

View File

@ -20,7 +20,7 @@ import (
// RmqConsumer is a client that used to consume messages from rocksmq
type RmqConsumer struct {
c rocksmq.Consumer
msgChannel chan ConsumerMessage
msgChannel chan Message
closeCh chan struct{}
once sync.Once
}
@ -31,10 +31,10 @@ func (rc *RmqConsumer) Subscription() string {
}
// Chan returns a channel to read messages from rocksmq
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
func (rc *RmqConsumer) Chan() <-chan Message {
if rc.msgChannel == nil {
rc.once.Do(func() {
rc.msgChannel = make(chan ConsumerMessage, 256)
rc.msgChannel = make(chan Message, 256)
go func() {
for { //nolint:gosimple
select {
@ -67,7 +67,7 @@ func (rc *RmqConsumer) ConsumeAfterSeek() bool {
}
// Ack is used to ask a rocksmq message
func (rc *RmqConsumer) Ack(message ConsumerMessage) {
func (rc *RmqConsumer) Ack(message Message) {
}
// Close is used to free the resources of this consumer

View File

@ -16,11 +16,11 @@ import (
)
// Check rmqMessage implements ConsumerMessage
var _ ConsumerMessage = (*rmqMessage)(nil)
var _ Message = (*rmqMessage)(nil)
// rmqMessage wraps the message for rocksmq
type rmqMessage struct {
msg rocksmq.ConsumerMessage
msg rocksmq.Message
}
func (rm *rmqMessage) Topic() string {

View File

@ -147,7 +147,7 @@ func (c *client) consume(consumer *consumer) {
break
}
for _, msg := range msgs {
consumer.messageCh <- ConsumerMessage{
consumer.messageCh <- Message{
MsgID: msg.MsgID,
Payload: msg.Payload,
Topic: consumer.Topic(),

View File

@ -44,11 +44,11 @@ type ConsumerOptions struct {
// Message for this consumer
// When a message is received, it will be pushed to this channel for consumption
MessageChannel chan ConsumerMessage
MessageChannel chan Message
}
// ConsumerMessage is the message content of a consumer message
type ConsumerMessage struct {
type Message struct {
Consumer
MsgID UniqueID
Topic string
@ -67,7 +67,7 @@ type Consumer interface {
MsgMutex() chan struct{}
// Message channel
Chan() <-chan ConsumerMessage
Chan() <-chan Message
// Seek to the uniqueID position
Seek(UniqueID) error //nolint:govet

View File

@ -28,7 +28,7 @@ type consumer struct {
startOnce sync.Once
msgMutex chan struct{}
messageCh chan ConsumerMessage
messageCh chan Message
}
func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
@ -46,7 +46,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 1)
messageCh = make(chan Message, 1)
}
return &consumer{
@ -75,7 +75,7 @@ func getExistedConsumer(c *client, options ConsumerOptions, msgMutex chan struct
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 1)
messageCh = make(chan Message, 1)
}
return &consumer{
@ -104,7 +104,7 @@ func (c *consumer) MsgMutex() chan struct{} {
}
// Chan start consume goroutine and return message channel
func (c *consumer) Chan() <-chan ConsumerMessage {
func (c *consumer) Chan() <-chan Message {
c.startOnce.Do(func() {
c.client.wg.Add(1)
go c.client.consume(c)

View File

@ -0,0 +1,54 @@
package rocksmq
import (
"context"
)
// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Reader
Message
}
// ReaderOptions abstraction Reader options to use.
type ReaderOptions struct {
// Topic specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
// Name set the reader name.
Name string
// Attach a set of application defined properties to the reader
// This properties will be visible in the topic stats
Properties map[string]string
// StartMessageID initial reader positioning is done by specifying a message id. The options are:
// * `MessageID` : Start reading from a particular message id, the reader will position itself on that
// specific position. The first message to be read will be the message next to the specified
// messageID
StartMessageID UniqueID
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
}
// Reader can be used to scan through all the messages currently available in a topic.
type Reader interface {
// Topic from which this reader is reading from
Topic() string
// Next read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
// HasNext check if there is any message available to read from the current position
HasNext() bool
// Close the reader and stop the broker to push more messages
Close()
// Reset the subscription associated with this reader to a specific message id.
Seek(UniqueID) error //nolint:govet
}