Reorganize mqclient

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/4973/head^2
Xiangyu Wang 2021-04-02 10:01:11 +08:00 committed by yefu.chen
parent 84f950bc4f
commit 8b62790017
24 changed files with 143 additions and 157 deletions

View File

@ -1,11 +0,0 @@
package pulsar
import "github.com/apache/pulsar-client-go/pulsar"
type pulsarID struct {
messageID pulsar.MessageID
}
func (pid *pulsarID) Serialize() []byte {
return pid.messageID.Serialize()
}

View File

@ -1,14 +0,0 @@
package rocksmq
import (
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type rmqID struct {
messageID rocksmq.UniqueID
}
func (rid *rmqID) Serialize() []byte {
return typeutil.SerializeRmqID(rid.messageID)
}

View File

@ -12,18 +12,18 @@ import (
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/mqclient"
"github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/util/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
type MessageID = client.MessageID type MessageID = mqclient.MessageID
type Client = client.Client type Client = mqclient.Client
type Producer = client.Producer type Producer = mqclient.Producer
type Consumer = client.Consumer type Consumer = mqclient.Consumer
type TsMsg = msgstream.TsMsg type TsMsg = msgstream.TsMsg
type MsgPack = msgstream.MsgPack type MsgPack = msgstream.MsgPack
type MsgType = msgstream.MsgType type MsgType = msgstream.MsgType
@ -88,7 +88,7 @@ func NewMsgStream(ctx context.Context,
func (ms *msgStream) AsProducer(channels []string) { func (ms *msgStream) AsProducer(channels []string) {
for _, channel := range channels { for _, channel := range channels {
fn := func() error { fn := func() error {
pp, err := ms.client.CreateProducer(client.ProducerOptions{Topic: channel}) pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel})
if err != nil { if err != nil {
return err return err
} }
@ -117,12 +117,12 @@ func (ms *msgStream) AsConsumer(channels []string,
continue continue
} }
fn := func() error { fn := func() error {
receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
pc, err := ms.client.Subscribe(client.ConsumerOptions{ pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Topic: channel, Topic: channel,
SubscriptionName: subName, SubscriptionName: subName,
Type: client.KeyShared, Type: mqclient.KeyShared,
SubscriptionInitialPosition: client.SubscriptionPositionEarliest, SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
MessageChannel: receiveChannel, MessageChannel: receiveChannel,
}) })
if err != nil { if err != nil {
@ -234,7 +234,7 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error {
return err return err
} }
msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
@ -266,7 +266,7 @@ func (ms *msgStream) Broadcast(msgPack *MsgPack) error {
return err return err
} }
msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
@ -424,12 +424,12 @@ func (ms *TtMsgStream) AsConsumer(channels []string,
continue continue
} }
fn := func() error { fn := func() error {
receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
pc, err := ms.client.Subscribe(client.ConsumerOptions{ pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Topic: channel, Topic: channel,
SubscriptionName: subName, SubscriptionName: subName,
Type: client.KeyShared, Type: mqclient.KeyShared,
SubscriptionInitialPosition: client.SubscriptionPositionEarliest, SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
MessageChannel: receiveChannel, MessageChannel: receiveChannel,
}) })
if err != nil { if err != nil {
@ -676,12 +676,12 @@ func (ms *TtMsgStream) Seek(mp *internalpb.MsgPosition) error {
} }
fn := func() error { fn := func() error {
receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
consumer, err = ms.client.Subscribe(client.ConsumerOptions{ consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{
Topic: seekChannel, Topic: seekChannel,
SubscriptionName: subName, SubscriptionName: subName,
SubscriptionInitialPosition: client.SubscriptionPositionEarliest, SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Type: client.KeyShared, Type: mqclient.KeyShared,
MessageChannel: receiveChannel, MessageChannel: receiveChannel,
}) })
if err != nil { if err != nil {

View File

@ -9,7 +9,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/allocator"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
rocksmq2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/rocksmq"
client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
@ -18,7 +17,8 @@ import (
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
pulsar2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/pulsar"
"github.com/zilliztech/milvus-distributed/internal/util/mqclient"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -187,7 +187,7 @@ func initPulsarStream(pulsarAddress string,
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
// set input stream // set input stream
pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
for _, opt := range opts { for _, opt := range opts {
@ -197,7 +197,7 @@ func initPulsarStream(pulsarAddress string,
var input msgstream.MsgStream = inputStream var input msgstream.MsgStream = inputStream
// set output stream // set output stream
pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start() outputStream.Start()
@ -214,7 +214,7 @@ func initPulsarTtStream(pulsarAddress string,
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
// set input stream // set input stream
pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
for _, opt := range opts { for _, opt := range opts {
@ -224,7 +224,7 @@ func initPulsarTtStream(pulsarAddress string,
var input msgstream.MsgStream = inputStream var input msgstream.MsgStream = inputStream
// set output stream // set output stream
pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start() outputStream.Start()
@ -433,12 +433,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
inputStream.Start() inputStream.Start()
pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start() outputStream.Start()
@ -487,12 +487,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, deleteMsg) msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
inputStream.Start() inputStream.Start()
pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start() outputStream.Start()
@ -521,12 +521,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4, 4)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4, 4))
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
inputStream.Start() inputStream.Start()
pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start() outputStream.Start()
@ -701,7 +701,7 @@ func initRmqStream(producerChannels []string,
opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) { opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
rmqClient, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
for _, opt := range opts { for _, opt := range opts {
@ -710,7 +710,7 @@ func initRmqStream(producerChannels []string,
inputStream.Start() inputStream.Start()
var input msgstream.MsgStream = inputStream var input msgstream.MsgStream = inputStream
rmqClient2, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) rmqClient2, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
outputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerGroupName) outputStream.AsConsumer(consumerChannels, consumerGroupName)
outputStream.Start() outputStream.Start()
@ -725,7 +725,7 @@ func initRmqTtStream(producerChannels []string,
opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) { opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
rmqClient, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels) inputStream.AsProducer(producerChannels)
for _, opt := range opts { for _, opt := range opts {
@ -734,7 +734,7 @@ func initRmqTtStream(producerChannels []string,
inputStream.Start() inputStream.Start()
var input msgstream.MsgStream = inputStream var input msgstream.MsgStream = inputStream
rmqClient2, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) rmqClient2, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerGroupName) outputStream.AsConsumer(consumerChannels, consumerGroupName)
outputStream.Start() outputStream.Start()

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
pulsar2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgstream/ms" "github.com/zilliztech/milvus-distributed/internal/msgstream/ms"
pulsar2 "github.com/zilliztech/milvus-distributed/internal/util/mqclient"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"

View File

@ -19,8 +19,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/mqclient"
"github.com/zilliztech/milvus-distributed/internal/util/trace" "github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type TsMsg = msgstream.TsMsg type TsMsg = msgstream.TsMsg
@ -471,7 +471,7 @@ func (ms *PulsarMsgStream) Chan() <-chan *MsgPack {
func (ms *PulsarMsgStream) Seek(mp *internalpb.MsgPosition) error { func (ms *PulsarMsgStream) Seek(mp *internalpb.MsgPosition) error {
if _, ok := ms.consumers[mp.ChannelName]; ok { if _, ok := ms.consumers[mp.ChannelName]; ok {
consumer := ms.consumers[mp.ChannelName] consumer := ms.consumers[mp.ChannelName]
messageID, err := typeutil.DeserializePulsarMsgID(mp.MsgID) messageID, err := mqclient.DeserializePulsarMsgID(mp.MsgID)
if err != nil { if err != nil {
return err return err
} }
@ -776,7 +776,7 @@ func (ms *PulsarTtMsgStream) Seek(mp *internalpb.MsgPosition) error {
if consumer == nil { if consumer == nil {
return errors.New("pulsar is not ready, consumer is nil") return errors.New("pulsar is not ready, consumer is nil")
} }
seekMsgID, err := typeutil.DeserializePulsarMsgID(mp.MsgID) seekMsgID, err := mqclient.DeserializePulsarMsgID(mp.MsgID)
if err != nil { if err != nil {
return err return err
} }

View File

@ -3,8 +3,8 @@ package rmqms
import ( import (
"context" "context"
rocksmq2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/msgstream/ms" "github.com/zilliztech/milvus-distributed/internal/msgstream/ms"
rocksmq2 "github.com/zilliztech/milvus-distributed/internal/util/mqclient"
client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"

View File

@ -17,8 +17,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/mqclient"
client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type TsMsg = msgstream.TsMsg type TsMsg = msgstream.TsMsg
@ -301,7 +301,7 @@ func (rms *RmqMsgStream) receiveMsg(consumer Consumer) {
tsMsg.SetPosition(&msgstream.MsgPosition{ tsMsg.SetPosition(&msgstream.MsgPosition{
ChannelName: filepath.Base(consumer.Topic()), ChannelName: filepath.Base(consumer.Topic()),
MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID),
}) })
msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
@ -317,7 +317,7 @@ func (rms *RmqMsgStream) Chan() <-chan *msgstream.MsgPack {
func (rms *RmqMsgStream) Seek(mp *msgstream.MsgPosition) error { func (rms *RmqMsgStream) Seek(mp *msgstream.MsgPosition) error {
if _, ok := rms.consumers[mp.ChannelName]; ok { if _, ok := rms.consumers[mp.ChannelName]; ok {
consumer := rms.consumers[mp.ChannelName] consumer := rms.consumers[mp.ChannelName]
msgID, err := typeutil.DeserializeRmqID(mp.MsgID) msgID, err := mqclient.DeserializeRmqID(mp.MsgID)
if err != nil { if err != nil {
return err return err
} }
@ -544,7 +544,7 @@ func (rtms *RmqTtMsgStream) findTimeTick(consumer Consumer,
tsMsg.SetPosition(&msgstream.MsgPosition{ tsMsg.SetPosition(&msgstream.MsgPosition{
ChannelName: filepath.Base(consumer.Topic()), ChannelName: filepath.Base(consumer.Topic()),
MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID),
}) })
rtms.unsolvedMutex.Lock() rtms.unsolvedMutex.Lock()
@ -590,7 +590,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error {
if consumer == nil { if consumer == nil {
return errors.New("RocksMQ is not ready, consumer is nil") return errors.New("RocksMQ is not ready, consumer is nil")
} }
seekMsgID, err := typeutil.DeserializeRmqID(mp.MsgID) seekMsgID, err := mqclient.DeserializeRmqID(mp.MsgID)
if err != nil { if err != nil {
return err return err
} }
@ -629,7 +629,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error {
if tsMsg.BeginTs() > mp.Timestamp { if tsMsg.BeginTs() > mp.Timestamp {
tsMsg.SetPosition(&msgstream.MsgPosition{ tsMsg.SetPosition(&msgstream.MsgPosition{
ChannelName: filepath.Base(consumer.Topic()), ChannelName: filepath.Base(consumer.Topic()),
MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID),
}) })
rtms.unsolvedBuf[consumer] = append(rtms.unsolvedBuf[consumer], tsMsg) rtms.unsolvedBuf[consumer] = append(rtms.unsolvedBuf[consumer], tsMsg)
} }

View File

@ -1,4 +1,4 @@
package client package mqclient
type Client interface { type Client interface {
// Create a producer instance // Create a producer instance

View File

@ -1,4 +1,4 @@
package client package mqclient
type SubscriptionInitialPosition int type SubscriptionInitialPosition int

View File

@ -1,4 +1,4 @@
package client package mqclient
type MessageID interface { type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else // Serialize the message id into a sequence of bytes that can be stored somewhere else

View File

@ -1,4 +1,4 @@
package client package mqclient
type ConsumerMessage interface { type ConsumerMessage interface {
// Topic get the topic from which this message originated from // Topic get the topic from which this message originated from

View File

@ -1,4 +1,4 @@
package client package mqclient
import "context" import "context"

View File

@ -1,4 +1,4 @@
package pulsar package mqclient
import ( import (
"errors" "errors"
@ -7,8 +7,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -26,7 +24,7 @@ func NewPulsarClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
return cli, nil return cli, nil
} }
func (pc *pulsarClient) CreateProducer(options client.ProducerOptions) (client.Producer, error) { func (pc *pulsarClient) CreateProducer(options ProducerOptions) (Producer, error) {
opts := pulsar.ProducerOptions{Topic: options.Topic} opts := pulsar.ProducerOptions{Topic: options.Topic}
pp, err := pc.client.CreateProducer(opts) pp, err := pc.client.CreateProducer(opts)
if err != nil { if err != nil {
@ -39,7 +37,7 @@ func (pc *pulsarClient) CreateProducer(options client.ProducerOptions) (client.P
return producer, nil return producer, nil
} }
func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consumer, error) { func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize) receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize)
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
Topic: options.Topic, Topic: options.Topic,
@ -51,7 +49,7 @@ func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consum
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgChannel := make(chan client.ConsumerMessage, 1) msgChannel := make(chan ConsumerMessage, 1)
pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel} pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel}
go func() { go func() {
@ -70,21 +68,21 @@ func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consum
return pConsumer, nil return pConsumer, nil
} }
func (pc *pulsarClient) EarliestMessageID() client.MessageID { func (pc *pulsarClient) EarliestMessageID() MessageID {
msgID := pulsar.EarliestMessageID() msgID := pulsar.EarliestMessageID()
return &pulsarID{messageID: msgID} return &pulsarID{messageID: msgID}
} }
func (pc *pulsarClient) StringToMsgID(id string) (client.MessageID, error) { func (pc *pulsarClient) StringToMsgID(id string) (MessageID, error) {
pID, err := typeutil.StringToPulsarMsgID(id) pID, err := StringToPulsarMsgID(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pulsarID{messageID: pID}, nil return &pulsarID{messageID: pID}, nil
} }
func (pc *pulsarClient) BytesToMsgID(id []byte) (client.MessageID, error) { func (pc *pulsarClient) BytesToMsgID(id []byte) (MessageID, error) {
pID, err := typeutil.DeserializePulsarMsgID(id) pID, err := DeserializePulsarMsgID(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,29 +1,28 @@
package pulsar package mqclient
import ( import (
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
) )
type pulsarConsumer struct { type pulsarConsumer struct {
c pulsar.Consumer c pulsar.Consumer
msgChannel chan client.ConsumerMessage msgChannel chan ConsumerMessage
} }
func (pc *pulsarConsumer) Subscription() string { func (pc *pulsarConsumer) Subscription() string {
return pc.c.Subscription() return pc.c.Subscription()
} }
func (pc *pulsarConsumer) Chan() <-chan client.ConsumerMessage { func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
return pc.msgChannel return pc.msgChannel
} }
func (pc *pulsarConsumer) Seek(id client.MessageID) error { func (pc *pulsarConsumer) Seek(id MessageID) error {
messageID := id.(*pulsarID).messageID messageID := id.(*pulsarID).messageID
return pc.c.Seek(messageID) return pc.c.Seek(messageID)
} }
func (pc *pulsarConsumer) Ack(message client.ConsumerMessage) { func (pc *pulsarConsumer) Ack(message ConsumerMessage) {
pm := message.(*pulsarMessage) pm := message.(*pulsarMessage)
pc.c.Ack(pm.msg) pc.c.Ack(pm.msg)
} }

View File

@ -0,0 +1,31 @@
package mqclient
import (
"strings"
"github.com/apache/pulsar-client-go/pulsar"
)
type pulsarID struct {
messageID pulsar.MessageID
}
func (pid *pulsarID) Serialize() []byte {
return pid.messageID.Serialize()
}
func SerializePulsarMsgID(messageID pulsar.MessageID) []byte {
return messageID.Serialize()
}
func DeserializePulsarMsgID(messageID []byte) (pulsar.MessageID, error) {
return pulsar.DeserializeMessageID(messageID)
}
func PulsarMsgIDToString(messageID pulsar.MessageID) string {
return strings.ToValidUTF8(string(messageID.Serialize()), "")
}
func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) {
return pulsar.DeserializeMessageID([]byte(msgString))
}

View File

@ -1,8 +1,7 @@
package pulsar package mqclient
import ( import (
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
) )
type pulsarMessage struct { type pulsarMessage struct {
@ -21,7 +20,7 @@ func (pm *pulsarMessage) Payload() []byte {
return pm.msg.Payload() return pm.msg.Payload()
} }
func (pm *pulsarMessage) ID() client.MessageID { func (pm *pulsarMessage) ID() MessageID {
id := pm.msg.ID() id := pm.msg.ID()
pid := &pulsarID{messageID: id} pid := &pulsarID{messageID: id}
return pid return pid

View File

@ -1,10 +1,9 @@
package pulsar package mqclient
import ( import (
"context" "context"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
) )
type pulsarProducer struct { type pulsarProducer struct {
@ -15,7 +14,7 @@ func (pp *pulsarProducer) Topic() string {
return pp.p.Topic() return pp.p.Topic()
} }
func (pp *pulsarProducer) Send(ctx context.Context, message *client.ProducerMessage) error { func (pp *pulsarProducer) Send(ctx context.Context, message *ProducerMessage) error {
ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties} ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties}
_, err := pp.p.Send(ctx, ppm) _, err := pp.p.Send(ctx, ppm)
return err return err

View File

@ -1,4 +1,4 @@
package rocksmq package mqclient
import ( import (
"strconv" "strconv"
@ -6,9 +6,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type rmqClient struct { type rmqClient struct {
@ -24,7 +22,7 @@ func NewRmqClient(opts rocksmq.ClientOptions) (*rmqClient, error) {
return &rmqClient{client: c}, nil return &rmqClient{client: c}, nil
} }
func (rc *rmqClient) CreateProducer(options client.ProducerOptions) (client.Producer, error) { func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
rmqOpts := rocksmq.ProducerOptions{Topic: options.Topic} rmqOpts := rocksmq.ProducerOptions{Topic: options.Topic}
pp, err := rc.client.CreateProducer(rmqOpts) pp, err := rc.client.CreateProducer(rmqOpts)
if err != nil { if err != nil {
@ -34,7 +32,7 @@ func (rc *rmqClient) CreateProducer(options client.ProducerOptions) (client.Prod
return &rp, nil return &rp, nil
} }
func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer, error) { func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize) receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize)
cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{ cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{
@ -46,7 +44,7 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer,
return nil, err return nil, err
} }
msgChannel := make(chan client.ConsumerMessage, 1) msgChannel := make(chan ConsumerMessage, 1)
rConsumer := &rmqConsumer{c: cli, msgChannel: msgChannel} rConsumer := &rmqConsumer{c: cli, msgChannel: msgChannel}
go func() { go func() {
@ -65,12 +63,12 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer,
return rConsumer, nil return rConsumer, nil
} }
func (rc *rmqClient) EarliestMessageID() client.MessageID { func (rc *rmqClient) EarliestMessageID() MessageID {
rID := rocksmq.EarliestMessageID() rID := rocksmq.EarliestMessageID()
return &rmqID{messageID: rID} return &rmqID{messageID: rID}
} }
func (rc *rmqClient) StringToMsgID(id string) (client.MessageID, error) { func (rc *rmqClient) StringToMsgID(id string) (MessageID, error) {
rID, err := strconv.ParseInt(id, 10, 64) rID, err := strconv.ParseInt(id, 10, 64)
if err != nil { if err != nil {
return nil, err return nil, err
@ -78,8 +76,8 @@ func (rc *rmqClient) StringToMsgID(id string) (client.MessageID, error) {
return &rmqID{messageID: rID}, nil return &rmqID{messageID: rID}, nil
} }
func (rc *rmqClient) BytesToMsgID(id []byte) (client.MessageID, error) { func (rc *rmqClient) BytesToMsgID(id []byte) (MessageID, error) {
rID, err := typeutil.DeserializeRmqID(id) rID, err := DeserializeRmqID(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,29 +1,28 @@
package rocksmq package mqclient
import ( import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
) )
type rmqConsumer struct { type rmqConsumer struct {
c rocksmq.Consumer c rocksmq.Consumer
msgChannel chan client.ConsumerMessage msgChannel chan ConsumerMessage
} }
func (rc *rmqConsumer) Subscription() string { func (rc *rmqConsumer) Subscription() string {
return rc.c.Subscription() return rc.c.Subscription()
} }
func (rc *rmqConsumer) Chan() <-chan client.ConsumerMessage { func (rc *rmqConsumer) Chan() <-chan ConsumerMessage {
return rc.msgChannel return rc.msgChannel
} }
func (rc *rmqConsumer) Seek(id client.MessageID) error { func (rc *rmqConsumer) Seek(id MessageID) error {
msgID := id.(*rmqID).messageID msgID := id.(*rmqID).messageID
return rc.c.Seek(msgID) return rc.c.Seek(msgID)
} }
func (rc *rmqConsumer) Ack(message client.ConsumerMessage) { func (rc *rmqConsumer) Ack(message ConsumerMessage) {
} }
func (rc *rmqConsumer) Close() { func (rc *rmqConsumer) Close() {

View File

@ -0,0 +1,25 @@
package mqclient
import (
"encoding/binary"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
)
type rmqID struct {
messageID rocksmq.UniqueID
}
func (rid *rmqID) Serialize() []byte {
return SerializeRmqID(rid.messageID)
}
func SerializeRmqID(messageID int64) []byte {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(messageID))
return b
}
func DeserializeRmqID(messageID []byte) (int64, error) {
return int64(binary.LittleEndian.Uint64(messageID)), nil
}

View File

@ -1,7 +1,6 @@
package rocksmq package mqclient
import ( import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
) )
@ -21,6 +20,6 @@ func (rm *rmqMessage) Payload() []byte {
return rm.msg.Payload return rm.msg.Payload
} }
func (rm *rmqMessage) ID() client.MessageID { func (rm *rmqMessage) ID() MessageID {
return &rmqID{messageID: rm.msg.MsgID} return &rmqID{messageID: rm.msg.MsgID}
} }

View File

@ -1,9 +1,8 @@
package rocksmq package mqclient
import ( import (
"context" "context"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
) )
@ -15,7 +14,7 @@ func (rp *rmqProducer) Topic() string {
return rp.p.Topic() return rp.p.Topic()
} }
func (rp *rmqProducer) Send(ctx context.Context, message *client.ProducerMessage) error { func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) error {
pm := &rocksmq.ProducerMessage{Payload: message.Payload} pm := &rocksmq.ProducerMessage{Payload: message.Payload}
return rp.p.Send(pm) return rp.p.Send(pm)
} }

View File

@ -4,11 +4,6 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/apache/pulsar-client-go/pulsar"
) )
// BytesToUint64 converts a byte slice to uint64. // BytesToUint64 converts a byte slice to uint64.
@ -43,36 +38,6 @@ func Uint64ToBytes(v uint64) []byte {
return b return b
} }
func PulsarMsgIDToString(messageID pulsar.MessageID) string {
return strings.ToValidUTF8(string(messageID.Serialize()), "")
}
func MsgIDToString(messageID client.MessageID) string {
return strings.ToValidUTF8(string(messageID.Serialize()), "")
}
func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) {
return pulsar.DeserializeMessageID([]byte(msgString))
}
func SerializePulsarMsgID(messageID pulsar.MessageID) []byte {
return messageID.Serialize()
}
func DeserializePulsarMsgID(messageID []byte) (pulsar.MessageID, error) {
return pulsar.DeserializeMessageID(messageID)
}
func SerializeRmqID(messageID int64) []byte {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(messageID))
return b
}
func DeserializeRmqID(messageID []byte) (int64, error) {
return int64(binary.LittleEndian.Uint64(messageID)), nil
}
func SliceRemoveDuplicate(a interface{}) (ret []interface{}) { func SliceRemoveDuplicate(a interface{}) (ret []interface{}) {
if reflect.TypeOf(a).Kind() != reflect.Slice { if reflect.TypeOf(a).Kind() != reflect.Slice {
fmt.Printf("input is not slice but %T\n", a) fmt.Printf("input is not slice but %T\n", a)