From 8b627900179a087dc782f444f1902f6ab4a25a22 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Fri, 2 Apr 2021 10:01:11 +0800 Subject: [PATCH] Reorganize mqclient Signed-off-by: Xiangyu Wang --- internal/msgstream/client/pulsar/pulsar_id.go | 11 ----- internal/msgstream/client/rocksmq/rmq_id.go | 14 ------- internal/msgstream/ms/msgstream_impl.go | 40 +++++++++---------- internal/msgstream/ms/msgstream_impl_test.go | 32 +++++++-------- internal/msgstream/pulsarms/factory.go | 2 +- .../msgstream/pulsarms/pulsar_msgstream.go | 6 +-- internal/msgstream/rmqms/factory.go | 2 +- internal/msgstream/rmqms/rmq_msgstream.go | 12 +++--- .../client => util/mqclient}/client.go | 2 +- .../client => util/mqclient}/consumer.go | 2 +- .../{msgstream/client => util/mqclient}/id.go | 2 +- .../client => util/mqclient}/message.go | 2 +- .../client => util/mqclient}/producer.go | 2 +- .../pulsar => util/mqclient}/pulsar_client.go | 20 +++++----- .../mqclient}/pulsar_consumer.go | 11 +++-- internal/util/mqclient/pulsar_id.go | 31 ++++++++++++++ .../mqclient}/pulsar_message.go | 5 +-- .../mqclient}/pulsar_producer.go | 5 +-- .../rocksmq => util/mqclient}/rmq_client.go | 18 ++++----- .../rocksmq => util/mqclient}/rmq_consumer.go | 11 +++-- internal/util/mqclient/rmq_id.go | 25 ++++++++++++ .../rocksmq => util/mqclient}/rmq_message.go | 5 +-- .../rocksmq => util/mqclient}/rmq_producer.go | 5 +-- internal/util/typeutil/convension.go | 35 ---------------- 24 files changed, 143 insertions(+), 157 deletions(-) delete mode 100644 internal/msgstream/client/pulsar/pulsar_id.go delete mode 100644 internal/msgstream/client/rocksmq/rmq_id.go rename internal/{msgstream/client => util/mqclient}/client.go (96%) rename internal/{msgstream/client => util/mqclient}/consumer.go (99%) rename internal/{msgstream/client => util/mqclient}/id.go (89%) rename internal/{msgstream/client => util/mqclient}/message.go (97%) rename internal/{msgstream/client => util/mqclient}/producer.go (95%) rename internal/{msgstream/client/pulsar => util/mqclient}/pulsar_client.go (75%) rename internal/{msgstream/client/pulsar => util/mqclient}/pulsar_consumer.go (56%) create mode 100644 internal/util/mqclient/pulsar_id.go rename internal/{msgstream/client/pulsar => util/mqclient}/pulsar_message.go (75%) rename internal/{msgstream/client/pulsar => util/mqclient}/pulsar_producer.go (67%) rename internal/{msgstream/client/rocksmq => util/mqclient}/rmq_client.go (71%) rename internal/{msgstream/client/rocksmq => util/mqclient}/rmq_consumer.go (53%) create mode 100644 internal/util/mqclient/rmq_id.go rename internal/{msgstream/client/rocksmq => util/mqclient}/rmq_message.go (74%) rename internal/{msgstream/client/rocksmq => util/mqclient}/rmq_producer.go (65%) diff --git a/internal/msgstream/client/pulsar/pulsar_id.go b/internal/msgstream/client/pulsar/pulsar_id.go deleted file mode 100644 index 741c5af3d5..0000000000 --- a/internal/msgstream/client/pulsar/pulsar_id.go +++ /dev/null @@ -1,11 +0,0 @@ -package pulsar - -import "github.com/apache/pulsar-client-go/pulsar" - -type pulsarID struct { - messageID pulsar.MessageID -} - -func (pid *pulsarID) Serialize() []byte { - return pid.messageID.Serialize() -} diff --git a/internal/msgstream/client/rocksmq/rmq_id.go b/internal/msgstream/client/rocksmq/rmq_id.go deleted file mode 100644 index 537e6ad57d..0000000000 --- a/internal/msgstream/client/rocksmq/rmq_id.go +++ /dev/null @@ -1,14 +0,0 @@ -package rocksmq - -import ( - "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type rmqID struct { - messageID rocksmq.UniqueID -} - -func (rid *rmqID) Serialize() []byte { - return typeutil.SerializeRmqID(rid.messageID) -} diff --git a/internal/msgstream/ms/msgstream_impl.go b/internal/msgstream/ms/msgstream_impl.go index 1957b26780..49e27c9b9f 100644 --- a/internal/msgstream/ms/msgstream_impl.go +++ b/internal/msgstream/ms/msgstream_impl.go @@ -12,18 +12,18 @@ import ( "github.com/opentracing/opentracing-go" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/mqclient" "github.com/zilliztech/milvus-distributed/internal/util/trace" "go.uber.org/zap" ) -type MessageID = client.MessageID -type Client = client.Client -type Producer = client.Producer -type Consumer = client.Consumer +type MessageID = mqclient.MessageID +type Client = mqclient.Client +type Producer = mqclient.Producer +type Consumer = mqclient.Consumer type TsMsg = msgstream.TsMsg type MsgPack = msgstream.MsgPack type MsgType = msgstream.MsgType @@ -88,7 +88,7 @@ func NewMsgStream(ctx context.Context, func (ms *msgStream) AsProducer(channels []string) { for _, channel := range channels { fn := func() error { - pp, err := ms.client.CreateProducer(client.ProducerOptions{Topic: channel}) + pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel}) if err != nil { return err } @@ -117,12 +117,12 @@ func (ms *msgStream) AsConsumer(channels []string, continue } fn := func() error { - receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) - pc, err := ms.client.Subscribe(client.ConsumerOptions{ + receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize) + pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{ Topic: channel, SubscriptionName: subName, - Type: client.KeyShared, - SubscriptionInitialPosition: client.SubscriptionPositionEarliest, + Type: mqclient.KeyShared, + SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, MessageChannel: receiveChannel, }) if err != nil { @@ -234,7 +234,7 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error { return err } - msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} + msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}} trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) @@ -266,7 +266,7 @@ func (ms *msgStream) Broadcast(msgPack *MsgPack) error { return err } - msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} + msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}} trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) @@ -424,12 +424,12 @@ func (ms *TtMsgStream) AsConsumer(channels []string, continue } fn := func() error { - receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) - pc, err := ms.client.Subscribe(client.ConsumerOptions{ + receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize) + pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{ Topic: channel, SubscriptionName: subName, - Type: client.KeyShared, - SubscriptionInitialPosition: client.SubscriptionPositionEarliest, + Type: mqclient.KeyShared, + SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, MessageChannel: receiveChannel, }) if err != nil { @@ -676,12 +676,12 @@ func (ms *TtMsgStream) Seek(mp *internalpb.MsgPosition) error { } fn := func() error { - receiveChannel := make(chan client.ConsumerMessage, ms.bufSize) - consumer, err = ms.client.Subscribe(client.ConsumerOptions{ + receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize) + consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{ Topic: seekChannel, SubscriptionName: subName, - SubscriptionInitialPosition: client.SubscriptionPositionEarliest, - Type: client.KeyShared, + SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, + Type: mqclient.KeyShared, MessageChannel: receiveChannel, }) if err != nil { diff --git a/internal/msgstream/ms/msgstream_impl_test.go b/internal/msgstream/ms/msgstream_impl_test.go index ebe38d4c71..c220331bf2 100644 --- a/internal/msgstream/ms/msgstream_impl_test.go +++ b/internal/msgstream/ms/msgstream_impl_test.go @@ -9,7 +9,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/allocator" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - rocksmq2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/rocksmq" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" "go.etcd.io/etcd/clientv3" @@ -18,7 +17,8 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/stretchr/testify/assert" - pulsar2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/pulsar" + + "github.com/zilliztech/milvus-distributed/internal/util/mqclient" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -187,7 +187,7 @@ func initPulsarStream(pulsarAddress string, factory := msgstream.ProtoUDFactory{} // set input stream - pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { @@ -197,7 +197,7 @@ func initPulsarStream(pulsarAddress string, var input msgstream.MsgStream = inputStream // set output stream - pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() @@ -214,7 +214,7 @@ func initPulsarTtStream(pulsarAddress string, factory := msgstream.ProtoUDFactory{} // set input stream - pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { @@ -224,7 +224,7 @@ func initPulsarTtStream(pulsarAddress string, var input msgstream.MsgStream = inputStream // set output stream - pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() @@ -433,12 +433,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { factory := msgstream.ProtoUDFactory{} - pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() @@ -487,12 +487,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, deleteMsg) factory := msgstream.ProtoUDFactory{} - pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() @@ -521,12 +521,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4, 4)) factory := msgstream.ProtoUDFactory{} - pulsarClient, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsar2.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() @@ -701,7 +701,7 @@ func initRmqStream(producerChannels []string, opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) { factory := msgstream.ProtoUDFactory{} - rmqClient, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) + rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { @@ -710,7 +710,7 @@ func initRmqStream(producerChannels []string, inputStream.Start() var input msgstream.MsgStream = inputStream - rmqClient2, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) + rmqClient2, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) outputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerGroupName) outputStream.Start() @@ -725,7 +725,7 @@ func initRmqTtStream(producerChannels []string, opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) { factory := msgstream.ProtoUDFactory{} - rmqClient, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) + rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) inputStream, _ := NewMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { @@ -734,7 +734,7 @@ func initRmqTtStream(producerChannels []string, inputStream.Start() var input msgstream.MsgStream = inputStream - rmqClient2, _ := rocksmq2.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) + rmqClient2, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq}) outputStream, _ := NewTtMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerGroupName) outputStream.Start() diff --git a/internal/msgstream/pulsarms/factory.go b/internal/msgstream/pulsarms/factory.go index 053bfcb434..179e9eb728 100644 --- a/internal/msgstream/pulsarms/factory.go +++ b/internal/msgstream/pulsarms/factory.go @@ -4,8 +4,8 @@ import ( "context" "github.com/apache/pulsar-client-go/pulsar" - pulsar2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/pulsar" "github.com/zilliztech/milvus-distributed/internal/msgstream/ms" + pulsar2 "github.com/zilliztech/milvus-distributed/internal/util/mqclient" "github.com/mitchellh/mapstructure" "github.com/zilliztech/milvus-distributed/internal/msgstream" diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index ddf987c09e..ad40c35cac 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -19,8 +19,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/mqclient" "github.com/zilliztech/milvus-distributed/internal/util/trace" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type TsMsg = msgstream.TsMsg @@ -471,7 +471,7 @@ func (ms *PulsarMsgStream) Chan() <-chan *MsgPack { func (ms *PulsarMsgStream) Seek(mp *internalpb.MsgPosition) error { if _, ok := ms.consumers[mp.ChannelName]; ok { consumer := ms.consumers[mp.ChannelName] - messageID, err := typeutil.DeserializePulsarMsgID(mp.MsgID) + messageID, err := mqclient.DeserializePulsarMsgID(mp.MsgID) if err != nil { return err } @@ -776,7 +776,7 @@ func (ms *PulsarTtMsgStream) Seek(mp *internalpb.MsgPosition) error { if consumer == nil { return errors.New("pulsar is not ready, consumer is nil") } - seekMsgID, err := typeutil.DeserializePulsarMsgID(mp.MsgID) + seekMsgID, err := mqclient.DeserializePulsarMsgID(mp.MsgID) if err != nil { return err } diff --git a/internal/msgstream/rmqms/factory.go b/internal/msgstream/rmqms/factory.go index 15d5924895..872e81511d 100644 --- a/internal/msgstream/rmqms/factory.go +++ b/internal/msgstream/rmqms/factory.go @@ -3,8 +3,8 @@ package rmqms import ( "context" - rocksmq2 "github.com/zilliztech/milvus-distributed/internal/msgstream/client/rocksmq" "github.com/zilliztech/milvus-distributed/internal/msgstream/ms" + rocksmq2 "github.com/zilliztech/milvus-distributed/internal/util/mqclient" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" "github.com/mitchellh/mapstructure" diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index e961ce28ff..46072e1292 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -17,8 +17,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/mqclient" client "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type TsMsg = msgstream.TsMsg @@ -301,7 +301,7 @@ func (rms *RmqMsgStream) receiveMsg(consumer Consumer) { tsMsg.SetPosition(&msgstream.MsgPosition{ ChannelName: filepath.Base(consumer.Topic()), - MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), + MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID), }) msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} @@ -317,7 +317,7 @@ func (rms *RmqMsgStream) Chan() <-chan *msgstream.MsgPack { func (rms *RmqMsgStream) Seek(mp *msgstream.MsgPosition) error { if _, ok := rms.consumers[mp.ChannelName]; ok { consumer := rms.consumers[mp.ChannelName] - msgID, err := typeutil.DeserializeRmqID(mp.MsgID) + msgID, err := mqclient.DeserializeRmqID(mp.MsgID) if err != nil { return err } @@ -544,7 +544,7 @@ func (rtms *RmqTtMsgStream) findTimeTick(consumer Consumer, tsMsg.SetPosition(&msgstream.MsgPosition{ ChannelName: filepath.Base(consumer.Topic()), - MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), + MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID), }) rtms.unsolvedMutex.Lock() @@ -590,7 +590,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error { if consumer == nil { return errors.New("RocksMQ is not ready, consumer is nil") } - seekMsgID, err := typeutil.DeserializeRmqID(mp.MsgID) + seekMsgID, err := mqclient.DeserializeRmqID(mp.MsgID) if err != nil { return err } @@ -629,7 +629,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error { if tsMsg.BeginTs() > mp.Timestamp { tsMsg.SetPosition(&msgstream.MsgPosition{ ChannelName: filepath.Base(consumer.Topic()), - MsgID: typeutil.SerializeRmqID(rmqMsg.MsgID), + MsgID: mqclient.SerializeRmqID(rmqMsg.MsgID), }) rtms.unsolvedBuf[consumer] = append(rtms.unsolvedBuf[consumer], tsMsg) } diff --git a/internal/msgstream/client/client.go b/internal/util/mqclient/client.go similarity index 96% rename from internal/msgstream/client/client.go rename to internal/util/mqclient/client.go index d8bee426c1..652df573bc 100644 --- a/internal/msgstream/client/client.go +++ b/internal/util/mqclient/client.go @@ -1,4 +1,4 @@ -package client +package mqclient type Client interface { // Create a producer instance diff --git a/internal/msgstream/client/consumer.go b/internal/util/mqclient/consumer.go similarity index 99% rename from internal/msgstream/client/consumer.go rename to internal/util/mqclient/consumer.go index ac959a285c..0c4bdb0432 100644 --- a/internal/msgstream/client/consumer.go +++ b/internal/util/mqclient/consumer.go @@ -1,4 +1,4 @@ -package client +package mqclient type SubscriptionInitialPosition int diff --git a/internal/msgstream/client/id.go b/internal/util/mqclient/id.go similarity index 89% rename from internal/msgstream/client/id.go rename to internal/util/mqclient/id.go index 6441cff13a..c5d0dc296a 100644 --- a/internal/msgstream/client/id.go +++ b/internal/util/mqclient/id.go @@ -1,4 +1,4 @@ -package client +package mqclient type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else diff --git a/internal/msgstream/client/message.go b/internal/util/mqclient/message.go similarity index 97% rename from internal/msgstream/client/message.go rename to internal/util/mqclient/message.go index 72f231a83f..2d3d050b72 100644 --- a/internal/msgstream/client/message.go +++ b/internal/util/mqclient/message.go @@ -1,4 +1,4 @@ -package client +package mqclient type ConsumerMessage interface { // Topic get the topic from which this message originated from diff --git a/internal/msgstream/client/producer.go b/internal/util/mqclient/producer.go similarity index 95% rename from internal/msgstream/client/producer.go rename to internal/util/mqclient/producer.go index 91bb855b93..c7cf3eee61 100644 --- a/internal/msgstream/client/producer.go +++ b/internal/util/mqclient/producer.go @@ -1,4 +1,4 @@ -package client +package mqclient import "context" diff --git a/internal/msgstream/client/pulsar/pulsar_client.go b/internal/util/mqclient/pulsar_client.go similarity index 75% rename from internal/msgstream/client/pulsar/pulsar_client.go rename to internal/util/mqclient/pulsar_client.go index 362068beaa..53fb2cdf65 100644 --- a/internal/msgstream/client/pulsar/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -1,4 +1,4 @@ -package pulsar +package mqclient import ( "errors" @@ -7,8 +7,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/zilliztech/milvus-distributed/internal/log" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.uber.org/zap" ) @@ -26,7 +24,7 @@ func NewPulsarClient(opts pulsar.ClientOptions) (*pulsarClient, error) { return cli, nil } -func (pc *pulsarClient) CreateProducer(options client.ProducerOptions) (client.Producer, error) { +func (pc *pulsarClient) CreateProducer(options ProducerOptions) (Producer, error) { opts := pulsar.ProducerOptions{Topic: options.Topic} pp, err := pc.client.CreateProducer(opts) if err != nil { @@ -39,7 +37,7 @@ func (pc *pulsarClient) CreateProducer(options client.ProducerOptions) (client.P return producer, nil } -func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consumer, error) { +func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize) consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ Topic: options.Topic, @@ -51,7 +49,7 @@ func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consum if err != nil { return nil, err } - msgChannel := make(chan client.ConsumerMessage, 1) + msgChannel := make(chan ConsumerMessage, 1) pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel} go func() { @@ -70,21 +68,21 @@ func (pc *pulsarClient) Subscribe(options client.ConsumerOptions) (client.Consum return pConsumer, nil } -func (pc *pulsarClient) EarliestMessageID() client.MessageID { +func (pc *pulsarClient) EarliestMessageID() MessageID { msgID := pulsar.EarliestMessageID() return &pulsarID{messageID: msgID} } -func (pc *pulsarClient) StringToMsgID(id string) (client.MessageID, error) { - pID, err := typeutil.StringToPulsarMsgID(id) +func (pc *pulsarClient) StringToMsgID(id string) (MessageID, error) { + pID, err := StringToPulsarMsgID(id) if err != nil { return nil, err } return &pulsarID{messageID: pID}, nil } -func (pc *pulsarClient) BytesToMsgID(id []byte) (client.MessageID, error) { - pID, err := typeutil.DeserializePulsarMsgID(id) +func (pc *pulsarClient) BytesToMsgID(id []byte) (MessageID, error) { + pID, err := DeserializePulsarMsgID(id) if err != nil { return nil, err } diff --git a/internal/msgstream/client/pulsar/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go similarity index 56% rename from internal/msgstream/client/pulsar/pulsar_consumer.go rename to internal/util/mqclient/pulsar_consumer.go index 1d6c8312cb..dd5612c7f7 100644 --- a/internal/msgstream/client/pulsar/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -1,29 +1,28 @@ -package pulsar +package mqclient import ( "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" ) type pulsarConsumer struct { c pulsar.Consumer - msgChannel chan client.ConsumerMessage + msgChannel chan ConsumerMessage } func (pc *pulsarConsumer) Subscription() string { return pc.c.Subscription() } -func (pc *pulsarConsumer) Chan() <-chan client.ConsumerMessage { +func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { return pc.msgChannel } -func (pc *pulsarConsumer) Seek(id client.MessageID) error { +func (pc *pulsarConsumer) Seek(id MessageID) error { messageID := id.(*pulsarID).messageID return pc.c.Seek(messageID) } -func (pc *pulsarConsumer) Ack(message client.ConsumerMessage) { +func (pc *pulsarConsumer) Ack(message ConsumerMessage) { pm := message.(*pulsarMessage) pc.c.Ack(pm.msg) } diff --git a/internal/util/mqclient/pulsar_id.go b/internal/util/mqclient/pulsar_id.go new file mode 100644 index 0000000000..ae2167b4f1 --- /dev/null +++ b/internal/util/mqclient/pulsar_id.go @@ -0,0 +1,31 @@ +package mqclient + +import ( + "strings" + + "github.com/apache/pulsar-client-go/pulsar" +) + +type pulsarID struct { + messageID pulsar.MessageID +} + +func (pid *pulsarID) Serialize() []byte { + return pid.messageID.Serialize() +} + +func SerializePulsarMsgID(messageID pulsar.MessageID) []byte { + return messageID.Serialize() +} + +func DeserializePulsarMsgID(messageID []byte) (pulsar.MessageID, error) { + return pulsar.DeserializeMessageID(messageID) +} + +func PulsarMsgIDToString(messageID pulsar.MessageID) string { + return strings.ToValidUTF8(string(messageID.Serialize()), "") +} + +func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) { + return pulsar.DeserializeMessageID([]byte(msgString)) +} diff --git a/internal/msgstream/client/pulsar/pulsar_message.go b/internal/util/mqclient/pulsar_message.go similarity index 75% rename from internal/msgstream/client/pulsar/pulsar_message.go rename to internal/util/mqclient/pulsar_message.go index ee60a479ab..7885c0a747 100644 --- a/internal/msgstream/client/pulsar/pulsar_message.go +++ b/internal/util/mqclient/pulsar_message.go @@ -1,8 +1,7 @@ -package pulsar +package mqclient import ( "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" ) type pulsarMessage struct { @@ -21,7 +20,7 @@ func (pm *pulsarMessage) Payload() []byte { return pm.msg.Payload() } -func (pm *pulsarMessage) ID() client.MessageID { +func (pm *pulsarMessage) ID() MessageID { id := pm.msg.ID() pid := &pulsarID{messageID: id} return pid diff --git a/internal/msgstream/client/pulsar/pulsar_producer.go b/internal/util/mqclient/pulsar_producer.go similarity index 67% rename from internal/msgstream/client/pulsar/pulsar_producer.go rename to internal/util/mqclient/pulsar_producer.go index ead22f7bc9..f86e71cfd5 100644 --- a/internal/msgstream/client/pulsar/pulsar_producer.go +++ b/internal/util/mqclient/pulsar_producer.go @@ -1,10 +1,9 @@ -package pulsar +package mqclient import ( "context" "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" ) type pulsarProducer struct { @@ -15,7 +14,7 @@ func (pp *pulsarProducer) Topic() string { return pp.p.Topic() } -func (pp *pulsarProducer) Send(ctx context.Context, message *client.ProducerMessage) error { +func (pp *pulsarProducer) Send(ctx context.Context, message *ProducerMessage) error { ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties} _, err := pp.p.Send(ctx, ppm) return err diff --git a/internal/msgstream/client/rocksmq/rmq_client.go b/internal/util/mqclient/rmq_client.go similarity index 71% rename from internal/msgstream/client/rocksmq/rmq_client.go rename to internal/util/mqclient/rmq_client.go index 780de35d87..73830df5fb 100644 --- a/internal/msgstream/client/rocksmq/rmq_client.go +++ b/internal/util/mqclient/rmq_client.go @@ -1,4 +1,4 @@ -package rocksmq +package mqclient import ( "strconv" @@ -6,9 +6,7 @@ import ( "go.uber.org/zap" "github.com/zilliztech/milvus-distributed/internal/log" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type rmqClient struct { @@ -24,7 +22,7 @@ func NewRmqClient(opts rocksmq.ClientOptions) (*rmqClient, error) { return &rmqClient{client: c}, nil } -func (rc *rmqClient) CreateProducer(options client.ProducerOptions) (client.Producer, error) { +func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) { rmqOpts := rocksmq.ProducerOptions{Topic: options.Topic} pp, err := rc.client.CreateProducer(rmqOpts) if err != nil { @@ -34,7 +32,7 @@ func (rc *rmqClient) CreateProducer(options client.ProducerOptions) (client.Prod return &rp, nil } -func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer, error) { +func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) { receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize) cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{ @@ -46,7 +44,7 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer, return nil, err } - msgChannel := make(chan client.ConsumerMessage, 1) + msgChannel := make(chan ConsumerMessage, 1) rConsumer := &rmqConsumer{c: cli, msgChannel: msgChannel} go func() { @@ -65,12 +63,12 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer, return rConsumer, nil } -func (rc *rmqClient) EarliestMessageID() client.MessageID { +func (rc *rmqClient) EarliestMessageID() MessageID { rID := rocksmq.EarliestMessageID() return &rmqID{messageID: rID} } -func (rc *rmqClient) StringToMsgID(id string) (client.MessageID, error) { +func (rc *rmqClient) StringToMsgID(id string) (MessageID, error) { rID, err := strconv.ParseInt(id, 10, 64) if err != nil { return nil, err @@ -78,8 +76,8 @@ func (rc *rmqClient) StringToMsgID(id string) (client.MessageID, error) { return &rmqID{messageID: rID}, nil } -func (rc *rmqClient) BytesToMsgID(id []byte) (client.MessageID, error) { - rID, err := typeutil.DeserializeRmqID(id) +func (rc *rmqClient) BytesToMsgID(id []byte) (MessageID, error) { + rID, err := DeserializeRmqID(id) if err != nil { return nil, err } diff --git a/internal/msgstream/client/rocksmq/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go similarity index 53% rename from internal/msgstream/client/rocksmq/rmq_consumer.go rename to internal/util/mqclient/rmq_consumer.go index f55da13611..c219ce3d5c 100644 --- a/internal/msgstream/client/rocksmq/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -1,29 +1,28 @@ -package rocksmq +package mqclient import ( - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" ) type rmqConsumer struct { c rocksmq.Consumer - msgChannel chan client.ConsumerMessage + msgChannel chan ConsumerMessage } func (rc *rmqConsumer) Subscription() string { return rc.c.Subscription() } -func (rc *rmqConsumer) Chan() <-chan client.ConsumerMessage { +func (rc *rmqConsumer) Chan() <-chan ConsumerMessage { return rc.msgChannel } -func (rc *rmqConsumer) Seek(id client.MessageID) error { +func (rc *rmqConsumer) Seek(id MessageID) error { msgID := id.(*rmqID).messageID return rc.c.Seek(msgID) } -func (rc *rmqConsumer) Ack(message client.ConsumerMessage) { +func (rc *rmqConsumer) Ack(message ConsumerMessage) { } func (rc *rmqConsumer) Close() { diff --git a/internal/util/mqclient/rmq_id.go b/internal/util/mqclient/rmq_id.go new file mode 100644 index 0000000000..93af49b443 --- /dev/null +++ b/internal/util/mqclient/rmq_id.go @@ -0,0 +1,25 @@ +package mqclient + +import ( + "encoding/binary" + + "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq" +) + +type rmqID struct { + messageID rocksmq.UniqueID +} + +func (rid *rmqID) Serialize() []byte { + return SerializeRmqID(rid.messageID) +} + +func SerializeRmqID(messageID int64) []byte { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(messageID)) + return b +} + +func DeserializeRmqID(messageID []byte) (int64, error) { + return int64(binary.LittleEndian.Uint64(messageID)), nil +} diff --git a/internal/msgstream/client/rocksmq/rmq_message.go b/internal/util/mqclient/rmq_message.go similarity index 74% rename from internal/msgstream/client/rocksmq/rmq_message.go rename to internal/util/mqclient/rmq_message.go index cd1eb6e254..8725239d8d 100644 --- a/internal/msgstream/client/rocksmq/rmq_message.go +++ b/internal/util/mqclient/rmq_message.go @@ -1,7 +1,6 @@ -package rocksmq +package mqclient import ( - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" ) @@ -21,6 +20,6 @@ func (rm *rmqMessage) Payload() []byte { return rm.msg.Payload } -func (rm *rmqMessage) ID() client.MessageID { +func (rm *rmqMessage) ID() MessageID { return &rmqID{messageID: rm.msg.MsgID} } diff --git a/internal/msgstream/client/rocksmq/rmq_producer.go b/internal/util/mqclient/rmq_producer.go similarity index 65% rename from internal/msgstream/client/rocksmq/rmq_producer.go rename to internal/util/mqclient/rmq_producer.go index bfbcb254bb..43fe30baf6 100644 --- a/internal/msgstream/client/rocksmq/rmq_producer.go +++ b/internal/util/mqclient/rmq_producer.go @@ -1,9 +1,8 @@ -package rocksmq +package mqclient import ( "context" - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq" ) @@ -15,7 +14,7 @@ func (rp *rmqProducer) Topic() string { return rp.p.Topic() } -func (rp *rmqProducer) Send(ctx context.Context, message *client.ProducerMessage) error { +func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) error { pm := &rocksmq.ProducerMessage{Payload: message.Payload} return rp.p.Send(pm) } diff --git a/internal/util/typeutil/convension.go b/internal/util/typeutil/convension.go index 0e2ae1c42a..b9c7124b25 100644 --- a/internal/util/typeutil/convension.go +++ b/internal/util/typeutil/convension.go @@ -4,11 +4,6 @@ import ( "encoding/binary" "fmt" "reflect" - "strings" - - "github.com/zilliztech/milvus-distributed/internal/msgstream/client" - - "github.com/apache/pulsar-client-go/pulsar" ) // BytesToUint64 converts a byte slice to uint64. @@ -43,36 +38,6 @@ func Uint64ToBytes(v uint64) []byte { return b } -func PulsarMsgIDToString(messageID pulsar.MessageID) string { - return strings.ToValidUTF8(string(messageID.Serialize()), "") -} - -func MsgIDToString(messageID client.MessageID) string { - return strings.ToValidUTF8(string(messageID.Serialize()), "") -} - -func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) { - return pulsar.DeserializeMessageID([]byte(msgString)) -} - -func SerializePulsarMsgID(messageID pulsar.MessageID) []byte { - return messageID.Serialize() -} - -func DeserializePulsarMsgID(messageID []byte) (pulsar.MessageID, error) { - return pulsar.DeserializeMessageID(messageID) -} - -func SerializeRmqID(messageID int64) []byte { - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, uint64(messageID)) - return b -} - -func DeserializeRmqID(messageID []byte) (int64, error) { - return int64(binary.LittleEndian.Uint64(messageID)), nil -} - func SliceRemoveDuplicate(a interface{}) (ret []interface{}) { if reflect.TypeOf(a).Kind() != reflect.Slice { fmt.Printf("input is not slice but %T\n", a)