Fix RocksMQ seek speed (#27646)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/25882/head
Xiaofan 2023-10-16 10:18:08 +08:00 committed by GitHub
parent 76bb0a7bd8
commit 333a0c8a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 268 additions and 347 deletions

4
go.mod
View File

@ -24,8 +24,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.5
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230907032509-23756009c643
github.com/milvus-io/milvus/pkg v0.0.1
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231011053327-5f3a9bd32b37
github.com/minio/minio-go/v7 v7.0.56
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
@ -139,6 +138,7 @@ require (
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/milvus-io/milvus/pkg v0.0.0-20230607023836-1593278f9d9c // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/minio/highwayhash v1.0.2 // indirect

7
go.sum
View File

@ -281,6 +281,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
@ -507,6 +508,7 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb
github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE=
github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro=
github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@ -545,6 +547,7 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k=
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s=
github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
@ -582,8 +585,12 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0-dev.1 h1:x6vhrVyK3wEuXIDHt0uk2l/UFPa/RRGWk1nkjgN5jkI=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0-dev.1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230907032509-23756009c643 h1:3MXEYckliGnyepZeLDrhn+speelsoRKU1IwD8JrxXMo=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230907032509-23756009c643/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231011053327-5f3a9bd32b37 h1:iwLDdLHL9EGDQWRVZHocjcyAzYTRQEN0NatW6DvzKeY=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231011053327-5f3a9bd32b37/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -11,9 +11,7 @@
package client
import (
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
)
import "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
// RocksMQ is the type server.RocksMQ
type RocksMQ = server.RocksMQ

View File

@ -164,12 +164,18 @@ func (c *client) deliver(consumer *consumer) {
break
}
for _, msg := range msgs {
// This is the hack, we put property into pl
properties := make(map[string]string, 0)
pl, err := UnmarshalHeader(msg.Payload)
if err == nil && pl != nil && pl.Base != nil {
properties = pl.Base.Properties
}
select {
case consumer.messageCh <- Message{
MsgID: msg.MsgID,
Payload: msg.Payload,
Properties: msg.Properties,
Topic: consumer.Topic(),
case consumer.messageCh <- &RmqMessage{
msgID: msg.MsgID,
payload: msg.Payload,
properties: properties,
topic: consumer.Topic(),
}:
case <-c.closeCh:
return

View File

@ -17,10 +17,14 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -187,7 +191,7 @@ func TestClient_SeekLatest(t *testing.T) {
})
assert.NotNil(t, producer)
assert.NoError(t, err)
msg := &ProducerMessage{
msg := &mqwrapper.ProducerMessage{
Payload: make([]byte, 10),
Properties: map[string]string{},
}
@ -197,7 +201,7 @@ func TestClient_SeekLatest(t *testing.T) {
msgChan := consumer1.Chan()
msgRead, ok := <-msgChan
assert.Equal(t, ok, true)
assert.Equal(t, msgRead.MsgID, id)
assert.Equal(t, msgRead.ID(), &server.RmqID{MessageID: id})
consumer1.Close()
@ -217,10 +221,10 @@ func TestClient_SeekLatest(t *testing.T) {
for loop {
select {
case msg := <-msgChan:
assert.Equal(t, len(msg.Payload), 8)
assert.Equal(t, len(msg.Payload()), 8)
loop = false
case <-ticker.C:
msg := &ProducerMessage{
msg := &mqwrapper.ProducerMessage{
Payload: make([]byte, 8),
}
_, err = producer.Send(msg)
@ -261,7 +265,7 @@ func TestClient_consume(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, consumer)
msg := &ProducerMessage{
msg := &mqwrapper.ProducerMessage{
Payload: make([]byte, 10),
}
id, err := producer.Send(msg)
@ -270,5 +274,80 @@ func TestClient_consume(t *testing.T) {
msgChan := consumer.Chan()
msgConsume, ok := <-msgChan
assert.Equal(t, ok, true)
assert.Equal(t, id, msgConsume.MsgID)
assert.Equal(t, &server.RmqID{MessageID: id}, msgConsume.ID())
}
func TestRocksmq_Properties(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client4"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := NewClient(Options{
Server: rmq,
})
assert.NoError(t, err)
defer client.Close()
topicName := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.NotNil(t, producer)
assert.NoError(t, err)
opt := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
}
consumer, err := client.Subscribe(opt)
assert.NoError(t, err)
assert.NotNil(t, consumer)
timeTickMsg := &msgpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: UniqueID(0),
Timestamp: 100,
SourceID: 0,
},
}
msgb, errMarshal := proto.Marshal(timeTickMsg)
assert.NoError(t, errMarshal)
assert.True(t, len(msgb) > 0)
header, err := UnmarshalHeader(msgb)
assert.NoError(t, err)
assert.NotNil(t, header)
msg := &mqwrapper.ProducerMessage{
Payload: msgb,
Properties: map[string]string{common.TraceIDKey: "a"},
}
_, err = producer.Send(msg)
assert.NoError(t, err)
msg = &mqwrapper.ProducerMessage{
Payload: msgb,
Properties: map[string]string{common.TraceIDKey: "b"},
}
_, err = producer.Send(msg)
assert.NoError(t, err)
msgChan := consumer.Chan()
msgConsume, ok := <-msgChan
assert.True(t, ok)
assert.Equal(t, len(msgConsume.Properties()), 1)
assert.Equal(t, msgConsume.Properties()[common.TraceIDKey], "a")
assert.NoError(t, err)
msgConsume, ok = <-msgChan
assert.True(t, ok)
assert.Equal(t, len(msgConsume.Properties()), 1)
assert.Equal(t, msgConsume.Properties()[common.TraceIDKey], "b")
assert.NoError(t, err)
timeTickMsg2 := &msgpb.TimeTickMsg{}
proto.Unmarshal(msgConsume.Payload(), timeTickMsg2)
assert.Equal(t, timeTickMsg2.Base.MsgType, commonpb.MsgType_TimeTick)
assert.Equal(t, timeTickMsg2.Base.Timestamp, uint64(100))
}

View File

@ -38,16 +38,7 @@ type ConsumerOptions struct {
// Message for this consumer
// When a message is received, it will be pushed to this channel for consumption
MessageChannel chan Message
}
// Message is the message content of a consumer message
type Message struct {
Consumer
MsgID UniqueID
Topic string
Payload []byte
Properties map[string]string
MessageChannel chan mqwrapper.Message
}
// Consumer interface provide operations for a consumer
@ -62,7 +53,7 @@ type Consumer interface {
MsgMutex() chan struct{}
// Message channel
Chan() <-chan Message
Chan() <-chan mqwrapper.Message
// Seek to the uniqueID position
Seek(UniqueID) error //nolint:govet

View File

@ -17,6 +17,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
)
type consumer struct {
@ -29,7 +30,7 @@ type consumer struct {
msgMutex chan struct{}
initCh chan struct{}
messageCh chan Message
messageCh chan mqwrapper.Message
}
func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
@ -47,7 +48,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan Message, 1)
messageCh = make(chan mqwrapper.Message, 1)
}
// only used for
initCh := make(chan struct{}, 1)
@ -79,7 +80,7 @@ func getExistedConsumer(c *client, options ConsumerOptions, msgMutex chan struct
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan Message, 1)
messageCh = make(chan mqwrapper.Message, 1)
}
return &consumer{
@ -108,7 +109,7 @@ func (c *consumer) MsgMutex() chan struct{} {
}
// Chan start consume goroutine and return message channel
func (c *consumer) Chan() <-chan Message {
func (c *consumer) Chan() <-chan mqwrapper.Message {
c.startOnce.Do(func() {
c.client.wg.Add(1)
go c.client.consume(c)

View File

@ -11,24 +11,20 @@
package client
import "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
// ProducerOptions is the options of a producer
type ProducerOptions struct {
Topic string
}
// ProducerMessage is the message of a producer
type ProducerMessage struct {
Payload []byte
Properties map[string]string
}
// Producer provedes some operations for a producer
type Producer interface {
// return the topic which producer is publishing to
Topic() string
// publish a message
Send(message *ProducerMessage) (UniqueID, error)
Send(message *mqwrapper.ProducerMessage) (UniqueID, error)
// Close a producer
Close()

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
)
// assertion make sure implementation
@ -50,11 +51,23 @@ func (p *producer) Topic() string {
}
// Send produce message in rocksmq
func (p *producer) Send(message *ProducerMessage) (UniqueID, error) {
func (p *producer) Send(message *mqwrapper.ProducerMessage) (UniqueID, error) {
// NOTICE: this is the hack.
// we should not unmarshal the payload here but we can not extend the payload byte
payload := message.Payload
header, err := UnmarshalHeader(message.Payload)
if err == nil && header != nil && header.Base != nil {
// try to marshal properties into message if message is real message
header.Base.Properties = message.Properties
payload, err = MarshalHeader(header)
if err != nil {
return 0, err
}
}
ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{
{
Payload: message.Payload,
Properties: message.Properties,
Payload: payload,
},
})
if err != nil {

View File

@ -9,37 +9,41 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rmq
package client
import (
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/client"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// Check rmqMessage implements ConsumerMessage
var _ mqwrapper.Message = (*rmqMessage)(nil)
var _ mqwrapper.Message = (*RmqMessage)(nil)
// rmqMessage wraps the message for rocksmq
type rmqMessage struct {
msg client.Message
type RmqMessage struct {
msgID typeutil.UniqueID
topic string
payload []byte
properties map[string]string
}
// Topic returns the topic name of rocksmq message
func (rm *rmqMessage) Topic() string {
return rm.msg.Topic
func (rm *RmqMessage) Topic() string {
return rm.topic
}
// Properties returns the properties of rocksmq message
func (rm *rmqMessage) Properties() map[string]string {
return rm.msg.Properties
func (rm *RmqMessage) Properties() map[string]string {
return rm.properties
}
// Payload returns the payload of rocksmq message
func (rm *rmqMessage) Payload() []byte {
return rm.msg.Payload
func (rm *RmqMessage) Payload() []byte {
return rm.payload
}
// ID returns the id of rocksmq message
func (rm *rmqMessage) ID() mqwrapper.MessageID {
return &rmqID{messageID: rm.msg.MsgID}
func (rm *RmqMessage) ID() mqwrapper.MessageID {
return &server.RmqID{MessageID: rm.msgID}
}

View File

@ -22,6 +22,7 @@ import (
server2 "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func newTopicName() string {
@ -46,6 +47,7 @@ func newMockClient() *client {
func newRocksMQ(t *testing.T, rmqPath string) server2.RocksMQ {
rocksdbPath := rmqPath
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := server2.NewRocksMQ(rocksdbPath, nil)
assert.NoError(t, err)
return rmq

View File

@ -0,0 +1,43 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package client
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)
func MarshalHeader(header *commonpb.MsgHeader) ([]byte, error) {
hb, err := proto.Marshal(header)
if err != nil {
return nil, err
}
return hb, nil
}
func UnmarshalHeader(headerbyte []byte) (*commonpb.MsgHeader, error) {
header := commonpb.MsgHeader{}
if headerbyte == nil {
return &header, fmt.Errorf("failed to unmarshal message header, payload is empty")
}
err := proto.Unmarshal(headerbyte, &header)
if err != nil {
return &header, err
}
if header.Base == nil {
return nil, fmt.Errorf("failed to unmarshal message, header is uncomplete")
}
return &header, nil
}

View File

@ -14,39 +14,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rmq
package server
import (
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
)
// rmqID wraps message ID for rocksmq
type rmqID struct {
messageID server.UniqueID
type RmqID struct {
MessageID UniqueID
}
// Check if rmqID implements MessageID interface
var _ mqwrapper.MessageID = &rmqID{}
var _ mqwrapper.MessageID = &RmqID{}
// Serialize convert rmq message id to []byte
func (rid *rmqID) Serialize() []byte {
return SerializeRmqID(rid.messageID)
func (rid *RmqID) Serialize() []byte {
return SerializeRmqID(rid.MessageID)
}
func (rid *rmqID) AtEarliestPosition() bool {
return rid.messageID <= 0
func (rid *RmqID) AtEarliestPosition() bool {
return rid.MessageID <= 0
}
func (rid *rmqID) LessOrEqualThan(msgID []byte) (bool, error) {
func (rid *RmqID) LessOrEqualThan(msgID []byte) (bool, error) {
rMsgID := DeserializeRmqID(msgID)
return rid.messageID <= rMsgID, nil
return rid.MessageID <= rMsgID, nil
}
func (rid *rmqID) Equal(msgID []byte) (bool, error) {
func (rid *RmqID) Equal(msgID []byte) (bool, error) {
rMsgID := DeserializeRmqID(msgID)
return rid.messageID == rMsgID, nil
return rid.MessageID == rMsgID, nil
}
// SerializeRmqID is used to serialize a message ID to byte array

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rmq
package server
import (
"math"
@ -24,8 +24,8 @@ import (
)
func TestRmqID_Serialize(t *testing.T) {
rid := &rmqID{
messageID: 8,
rid := &RmqID{
MessageID: 8,
}
bin := rid.Serialize()
@ -34,23 +34,23 @@ func TestRmqID_Serialize(t *testing.T) {
}
func Test_AtEarliestPosition(t *testing.T) {
rid := &rmqID{
messageID: 0,
rid := &RmqID{
MessageID: 0,
}
assert.True(t, rid.AtEarliestPosition())
rid = &rmqID{
messageID: math.MaxInt64,
rid = &RmqID{
MessageID: math.MaxInt64,
}
assert.False(t, rid.AtEarliestPosition())
}
func TestLessOrEqualThan(t *testing.T) {
rid1 := &rmqID{
messageID: 0,
rid1 := &RmqID{
MessageID: 0,
}
rid2 := &rmqID{
messageID: math.MaxInt64,
rid2 := &RmqID{
MessageID: math.MaxInt64,
}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
@ -67,12 +67,12 @@ func TestLessOrEqualThan(t *testing.T) {
}
func Test_Equal(t *testing.T) {
rid1 := &rmqID{
messageID: 0,
rid1 := &RmqID{
MessageID: 0,
}
rid2 := &rmqID{
messageID: math.MaxInt64,
rid2 := &RmqID{
MessageID: math.MaxInt64,
}
{

View File

@ -13,8 +13,7 @@ package server
// ProducerMessage that will be written to rocksdb
type ProducerMessage struct {
Payload []byte
Properties map[string]string
Payload []byte
}
// Consumer is rocksmq consumer
@ -26,9 +25,8 @@ type Consumer struct {
// ConsumerMessage that consumed from rocksdb
type ConsumerMessage struct {
MsgID UniqueID
Payload []byte
Properties map[string]string
MsgID UniqueID
Payload []byte
}
// RocksMQ is an interface thatmay be implemented by the application

View File

@ -12,7 +12,6 @@
package server
import (
"encoding/json"
"fmt"
"path"
"runtime"
@ -629,7 +628,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
if UniqueID(msgLen) != idEnd-idStart {
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
// Insert data to store system
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
@ -639,16 +637,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
msgID := idStart + UniqueID(i)
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
batch.PutCF(rmq.cfh[0], []byte(key), messages[i].Payload)
// batch.Put([]byte(key), messages[i].Payload)
if messages[i].Properties != nil {
properties, err := json.Marshal(messages[i].Properties)
if err != nil {
log.Warn("properties marshal failed", zap.Int64("msgID", msgID), zap.String("topicName", topicName),
zap.Error(err))
return nil, err
}
batch.PutCF(rmq.cfh[1], []byte(key), properties)
}
msgIDs[i] = msgID
msgSizes[msgID] = int64(len(messages[i].Payload))
}
@ -782,9 +770,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
defer readOpts.Destroy()
prefix := topicName + "/"
iter := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[0], typeutil.AddOne(prefix), readOpts)
iterProperty := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[1], typeutil.AddOne(prefix), readOpts)
defer iter.Close()
defer iterProperty.Close()
var dataKey string
if currentID == DefaultMessageID {
@ -793,7 +779,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10))
}
iter.Seek([]byte(dataKey))
iterProperty.Seek([]byte(dataKey))
consumerMessage := make([]ConsumerMessage, 0, n)
offset := 0
@ -801,11 +786,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
for ; iter.Valid() && offset < n; iter.Next() {
key := iter.Key()
val := iter.Value()
strKey := string(key.Data())
key.Free()
properties := make(map[string]string)
var propertiesValue []byte
strKey := string(key.Data())
msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64)
if err != nil {
val.Free()
@ -813,23 +796,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
}
offset++
if iterProperty.Valid() && string(iterProperty.Key().Data()) == string(iter.Key().Data()) {
// the key of properties is the same with the key of payload
// to prevent mix message with or without property column family
propertiesValue = iterProperty.Value().Data()
iterProperty.Next()
}
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be []
if len(propertiesValue) != 0 {
if err = json.Unmarshal(propertiesValue, &properties); err != nil {
return nil, err
}
}
msg := ConsumerMessage{
MsgID: msgID,
}
@ -837,10 +803,8 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
dataLen := len(origData)
if dataLen == 0 {
msg.Payload = nil
msg.Properties = nil
} else {
msg.Payload = make([]byte, dataLen)
msg.Properties = properties
copy(msg.Payload, origData)
}
consumerMessage = append(consumerMessage, msg)

View File

@ -12,7 +12,6 @@
package server
import (
"encoding/json"
"fmt"
"os"
"path"
@ -31,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -160,98 +158,6 @@ func (rmq *rocksmq) produceBefore2(topicName string, messages []producerMessageB
return msgIDs, nil
}
// to test compatibility concern
func (rmq *rocksmq) produceIn2(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
if rmq.isClosed() {
return nil, errors.New(RmqNotServingErrMsg)
}
start := time.Now()
ll, ok := topicMu.Load(topicName)
if !ok {
return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
getLockTime := time.Since(start).Milliseconds()
msgLen := len(messages)
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
if err != nil {
return []UniqueID{}, err
}
allocTime := time.Since(start).Milliseconds()
if UniqueID(msgLen) != idEnd-idStart {
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
// Insert data to store system
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
msgSizes := make(map[UniqueID]int64)
msgIDs := make([]UniqueID, msgLen)
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
msgID := idStart + UniqueID(i)
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
batch.Put([]byte(key), messages[i].Payload)
properties, err := json.Marshal(messages[i].Properties)
if err != nil {
log.Warn("properties marshal failed",
zap.Int64("msgID", msgID),
zap.String("topicName", topicName),
zap.Error(err))
return nil, err
}
pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
batch.Put([]byte(pKey), properties)
msgIDs[i] = msgID
msgSizes[msgID] = int64(len(messages[i].Payload))
}
opts := gorocksdb.NewDefaultWriteOptions()
defer opts.Destroy()
err = rmq.store.Write(opts, batch)
if err != nil {
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
// Update message page info
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
return []UniqueID{}, err
}
// TODO add this to monitor metrics
getProduceTime := time.Since(start).Milliseconds()
if getProduceTime > 200 {
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
zap.Int64("get lock elapse", getLockTime),
zap.Int64("alloc elapse", allocTime-getLockTime),
zap.Int64("write elapse", writeTime-allocTime),
zap.Int64("updatePage elapse", getProduceTime-writeTime),
zap.Int64("produce total elapse", getProduceTime),
)
}
rmq.topicLastID.Store(topicName, msgIDs[len(msgIDs)-1])
return msgIDs, nil
}
func TestRocksmq_RegisterConsumer(t *testing.T) {
suffix := "_register"
kvPath := rmqPath + kvPathSuffix + suffix
@ -263,6 +169,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -327,6 +234,7 @@ func TestRocksmq_Basic(t *testing.T) {
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -338,14 +246,14 @@ func TestRocksmq_Basic(t *testing.T) {
msgA := "a_message"
pMsgs := make([]ProducerMessage, 1)
pMsgA := ProducerMessage{Payload: []byte(msgA), Properties: map[string]string{common.TraceIDKey: "a"}}
pMsgA := ProducerMessage{Payload: []byte(msgA)}
pMsgs[0] = pMsgA
_, err = rmq.Produce(channelName, pMsgs)
assert.NoError(t, err)
pMsgB := ProducerMessage{Payload: []byte("b_message"), Properties: map[string]string{common.TraceIDKey: "b"}}
pMsgC := ProducerMessage{Payload: []byte("c_message"), Properties: map[string]string{common.TraceIDKey: "c"}}
pMsgB := ProducerMessage{Payload: []byte("b_message")}
pMsgC := ProducerMessage{Payload: []byte("c_message")}
pMsgs[0] = pMsgB
pMsgs = append(pMsgs, pMsgC)
@ -363,121 +271,12 @@ func TestRocksmq_Basic(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "a_message")
_, ok := cMsgs[0].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "a")
cMsgs, err = rmq.Consume(channelName, groupName, 2)
assert.NoError(t, err)
assert.Equal(t, len(cMsgs), 2)
assert.Equal(t, string(cMsgs[0].Payload), "b_message")
_, ok = cMsgs[0].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "b")
assert.Equal(t, string(cMsgs[1].Payload), "c_message")
_, ok = cMsgs[1].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c")
}
func TestRocksmq_Compatibility(t *testing.T) {
suffix := "rmq_compatibility"
kvPath := rmqPath + kvPathSuffix + suffix
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
channelName := "channel_rocks"
err = rmq.CreateTopic(channelName)
assert.NoError(t, err)
defer rmq.DestroyTopic(channelName)
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// it aims to test if produce before 2.2.0, will consume after 2.2.0 successfully
msgD := "d_message"
tMsgs := make([]producerMessageBefore2, 1)
tMsgD := producerMessageBefore2{Payload: []byte(msgD)}
tMsgs[0] = tMsgD
_, err = rmq.produceBefore2(channelName, tMsgs)
assert.NoError(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
assert.NoError(t, err)
cMsgs, err := rmq.Consume(channelName, groupName, 1)
if err != nil {
log.Info("test", zap.Any("err", err))
}
assert.NoError(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "d_message")
_, ok := cMsgs[0].Properties[common.TraceIDKey]
assert.False(t, ok)
// it will be set empty map if produce message has no properties field
expect := make(map[string]string)
assert.Equal(t, cMsgs[0].Properties, expect)
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
// after 2.3, the properties will be stored in column families
// it aims to test if produce in 2.2.0, but consume in 2.3.0, will get properties successfully
msg1 := "1_message"
tMsgs1 := make([]ProducerMessage, 1)
properties := make(map[string]string)
properties[common.TraceIDKey] = "1"
tMsg1 := ProducerMessage{Payload: []byte(msg1), Properties: properties}
tMsgs1[0] = tMsg1
_, err = rmq.produceIn2(channelName, tMsgs1)
assert.NoError(t, err)
msg2, err := rmq.Consume(channelName, groupName, 1)
assert.NoError(t, err)
assert.Equal(t, len(msg2), 1)
assert.Equal(t, string(msg2[0].Payload), "1_message")
_, ok = msg2[0].Properties[common.TraceIDKey]
assert.False(t, ok)
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
expect = make(map[string]string)
assert.Equal(t, cMsgs[0].Properties, expect)
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
// after 2.3, the properties will be stored in column families
// it aims to test the mixed message before 2.3.0 and after 2.3.0, will get properties successfully
msg3 := "3_message"
tMsgs3 := make([]ProducerMessage, 2)
properties3 := make(map[string]string)
properties3[common.TraceIDKey] = "3"
tMsg3 := ProducerMessage{Payload: []byte(msg3), Properties: properties3}
tMsgs3[0] = tMsg3
msg4 := "4_message"
tMsg4 := ProducerMessage{Payload: []byte(msg4)}
tMsgs3[1] = tMsg4
_, err = rmq.Produce(channelName, tMsgs3)
assert.NoError(t, err)
msg5, err := rmq.Consume(channelName, groupName, 2)
assert.NoError(t, err)
assert.Equal(t, len(msg5), 2)
assert.Equal(t, string(msg5[0].Payload), "3_message")
_, ok = msg5[0].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, msg5[0].Properties, properties3)
assert.Equal(t, string(msg5[1].Payload), "4_message")
_, ok = msg5[1].Properties[common.TraceIDKey]
assert.False(t, ok)
// it will be set empty map if produce message has no properties field
expect = make(map[string]string)
assert.Equal(t, msg5[1].Properties, expect)
}
func TestRocksmq_MultiConsumer(t *testing.T) {
@ -492,6 +291,7 @@ func TestRocksmq_MultiConsumer(t *testing.T) {
params := paramtable.Get()
params.Save(params.RocksmqCfg.PageSize.Key, "10")
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -544,6 +344,7 @@ func TestRocksmq_Dummy(t *testing.T) {
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -614,10 +415,12 @@ func TestRocksmq_Seek(t *testing.T) {
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
_, err = NewRocksMQ("", idAllocator)
assert.Error(t, err)
defer os.RemoveAll("_meta_kv")
@ -681,6 +484,7 @@ func TestRocksmq_Loop(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -753,6 +557,7 @@ func TestRocksmq_Goroutines(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -832,6 +637,7 @@ func TestRocksmq_Throughout(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -897,6 +703,7 @@ func TestRocksmq_MultiChan(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -951,6 +758,7 @@ func TestRocksmq_CopyData(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1019,6 +827,7 @@ func TestRocksmq_SeekToLatest(t *testing.T) {
defer os.RemoveAll(kvName)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1110,6 +919,7 @@ func TestRocksmq_GetLatestMsg(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
@ -1178,6 +988,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) {
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
paramtable.Init()
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1233,6 +1044,7 @@ func TestRocksmq_Close(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1265,6 +1077,7 @@ func TestRocksmq_SeekWithNoConsumerError(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1290,6 +1103,7 @@ func TestRocksmq_SeekTopicNotExistError(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1312,6 +1126,7 @@ func TestRocksmq_SeekTopicMutexError(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1335,6 +1150,7 @@ func TestRocksmq_moveConsumePosError(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1359,6 +1175,7 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) {
defer os.RemoveAll(kvName)
params := paramtable.Get()
params.Save(params.RocksmqCfg.PageSize.Key, "10")
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()
@ -1418,6 +1235,7 @@ func TestRocksmq_Info(t *testing.T) {
defer os.RemoveAll(kvName)
params := paramtable.Get()
params.Save(params.RocksmqCfg.PageSize.Key, "10")
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
rmq, err := NewRocksMQ(name, idAllocator)
assert.NoError(t, err)
defer rmq.Close()

View File

@ -84,7 +84,7 @@ func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con
log.Warn("unexpected subscription consumer options", zap.Error(err))
return nil, err
}
receiveChannel := make(chan client.Message, options.BufSize)
receiveChannel := make(chan mqwrapper.Message, options.BufSize)
cli, err := rc.client.Subscribe(client.ConsumerOptions{
Topic: options.Topic,
@ -108,7 +108,7 @@ func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con
// EarliestMessageID returns the earliest message ID for rmq client
func (rc *rmqClient) EarliestMessageID() mqwrapper.MessageID {
rID := client.EarliestMessageID()
return &rmqID{messageID: rID}
return &server.RmqID{MessageID: rID}
}
// StringToMsgID converts string id to MessageID
@ -117,13 +117,13 @@ func (rc *rmqClient) StringToMsgID(id string) (mqwrapper.MessageID, error) {
if err != nil {
return nil, err
}
return &rmqID{messageID: rID}, nil
return &server.RmqID{MessageID: rID}, nil
}
// BytesToMsgID converts a byte array to messageID
func (rc *rmqClient) BytesToMsgID(id []byte) (mqwrapper.MessageID, error) {
rID := DeserializeRmqID(id)
return &rmqID{messageID: rID}, nil
rID := server.DeserializeRmqID(id)
return &server.RmqID{MessageID: rID}, nil
}
func (rc *rmqClient) Close() {

View File

@ -43,6 +43,7 @@ func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
path := "/tmp/milvus/rdb_data"
defer os.RemoveAll(path)
paramtable.Get().Save("rocksmq.compressionTypes", "0,0,0,0,0")
_ = rocksmqimplserver.InitRocksMQ(path)
exitCode := m.Run()
defer rocksmqimplserver.CloseRocksMQ()
@ -66,11 +67,11 @@ func TestRmqClient_CreateProducer(t *testing.T) {
topic := "TestRmqClient_CreateProducer"
proOpts := mqwrapper.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(proOpts)
defer producer.Close()
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
rmqProducer := producer.(*rmqProducer)
defer rmqProducer.Close()
assert.Equal(t, rmqProducer.Topic(), topic)
@ -150,9 +151,9 @@ func TestRmqClient_Subscribe(t *testing.T) {
topic := "TestRmqClient_Subscribe"
proOpts := mqwrapper.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(proOpts)
defer producer.Close()
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
subName := "subName"
consumerOpts := mqwrapper.ConsumerOptions{
@ -197,7 +198,7 @@ func TestRmqClient_Subscribe(t *testing.T) {
assert.FailNow(t, "consumer failed to yield message in 100 milliseconds")
case msg := <-consumer.Chan():
consumer.Ack(msg)
rmqmsg := msg.(*rmqMessage)
rmqmsg := msg.(*rocksmqimplclient.RmqMessage)
msgPayload := rmqmsg.Payload()
assert.NotEmpty(t, msgPayload)
msgTopic := rmqmsg.Topic()
@ -205,7 +206,7 @@ func TestRmqClient_Subscribe(t *testing.T) {
msgProp := rmqmsg.Properties()
assert.Empty(t, msgProp)
msgID := rmqmsg.ID()
rID := msgID.(*rmqID)
rID := msgID.(*rocksmqimplserver.RmqID)
assert.NotZero(t, rID)
}
}

View File

@ -21,6 +21,7 @@ import (
"sync/atomic"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/client"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
)
@ -57,7 +58,7 @@ func (rc *Consumer) Chan() <-chan mqwrapper.Message {
skip := atomic.LoadInt32(&rc.skip)
if skip != 1 {
select {
case rc.msgChannel <- &rmqMessage{msg: msg}:
case rc.msgChannel <- msg:
case <-rc.closeCh:
// if consumer closed, enter close branch below
}
@ -78,7 +79,7 @@ func (rc *Consumer) Chan() <-chan mqwrapper.Message {
// Seek is used to seek the position in rocksmq topic
func (rc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
msgID := id.(*rmqID).messageID
msgID := id.(*server.RmqID).MessageID
// skip the first message when consume
if !inclusive {
atomic.StoreInt32(&rc.skip, 1)
@ -98,7 +99,7 @@ func (rc *Consumer) Close() {
func (rc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
msgID, err := rc.c.GetLatestMsgID()
return &rmqID{messageID: msgID}, err
return &server.RmqID{MessageID: msgID}, err
}
func (rc *Consumer) CheckTopicValid(topic string) error {

View File

@ -15,6 +15,7 @@ import (
"context"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/client"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/timerecord"
@ -37,17 +38,16 @@ func (rp *rmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMess
start := timerecord.NewTimeRecorder("send msg to stream")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc()
pm := &client.ProducerMessage{Payload: message.Payload, Properties: message.Properties}
id, err := rp.p.Send(pm)
id, err := rp.p.Send(message)
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return &rmqID{messageID: id}, err
return &server.RmqID{MessageID: id}, err
}
elapsed := start.ElapseSpan()
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc()
return &rmqID{messageID: id}, nil
return &server.RmqID{MessageID: id}, nil
}
// Close does nothing currently