mirror of https://github.com/milvus-io/milvus.git
Remove TraceID in mq (#20697)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/20843/head
parent
b021d7c59c
commit
d9185485f1
|
@ -87,3 +87,8 @@ const (
|
|||
const (
|
||||
CollectionTTLConfigKey = "collection.ttl.seconds"
|
||||
)
|
||||
|
||||
const (
|
||||
PropertiesKey string = "properties"
|
||||
TraceIDKey string = "uber-trace-id"
|
||||
)
|
||||
|
|
|
@ -154,7 +154,8 @@ func TestClient_SeekLatest(t *testing.T) {
|
|||
assert.NotNil(t, producer)
|
||||
assert.NoError(t, err)
|
||||
msg := &ProducerMessage{
|
||||
Payload: make([]byte, 10),
|
||||
Payload: make([]byte, 10),
|
||||
Properties: map[string]string{},
|
||||
}
|
||||
id, err := producer.Send(msg)
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -44,9 +44,10 @@ type ConsumerOptions struct {
|
|||
// Message is the message content of a consumer message
|
||||
type Message struct {
|
||||
Consumer
|
||||
MsgID UniqueID
|
||||
Topic string
|
||||
Payload []byte
|
||||
MsgID UniqueID
|
||||
Topic string
|
||||
Payload []byte
|
||||
Properties map[string]string
|
||||
}
|
||||
|
||||
// Consumer interface provide operations for a consumer
|
||||
|
|
|
@ -18,7 +18,8 @@ type ProducerOptions struct {
|
|||
|
||||
// ProducerMessage is the message of a producer
|
||||
type ProducerMessage struct {
|
||||
Payload []byte
|
||||
Payload []byte
|
||||
Properties map[string]string
|
||||
}
|
||||
|
||||
// Producer provedes some operations for a producer
|
||||
|
|
|
@ -52,7 +52,8 @@ func (p *producer) Topic() string {
|
|||
func (p *producer) Send(message *ProducerMessage) (UniqueID, error) {
|
||||
ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{
|
||||
{
|
||||
Payload: message.Payload,
|
||||
Payload: message.Payload,
|
||||
Properties: message.Properties,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
@ -13,7 +13,8 @@ package server
|
|||
|
||||
// ProducerMessage that will be written to rocksdb
|
||||
type ProducerMessage struct {
|
||||
Payload []byte
|
||||
Payload []byte
|
||||
Properties map[string]string
|
||||
}
|
||||
|
||||
// Consumer is rocksmq consumer
|
||||
|
@ -25,8 +26,9 @@ type Consumer struct {
|
|||
|
||||
// ConsumerMessage that consumed from rocksdb
|
||||
type ConsumerMessage struct {
|
||||
MsgID UniqueID
|
||||
Payload []byte
|
||||
MsgID UniqueID
|
||||
Payload []byte
|
||||
Properties map[string]string
|
||||
}
|
||||
|
||||
// RocksMQ is an interface thatmay be implemented by the application
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -274,7 +276,7 @@ func (rmq *rocksmq) Close() {
|
|||
log.Info("Successfully close rocksmq")
|
||||
}
|
||||
|
||||
//print rmq consumer Info
|
||||
// print rmq consumer Info
|
||||
func (rmq *rocksmq) Info() bool {
|
||||
rtn := true
|
||||
rmq.consumers.Range(func(key, vals interface{}) bool {
|
||||
|
@ -594,6 +596,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
|||
msgID := idStart + UniqueID(i)
|
||||
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(key), messages[i].Payload)
|
||||
properties, err := json.Marshal(messages[i].Properties)
|
||||
if err != nil {
|
||||
log.Warn("properties marshal failed",
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.String("topicName", topicName),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(pKey), properties)
|
||||
msgIDs[i] = msgID
|
||||
msgSizes[msgID] = int64(len(messages[i].Payload))
|
||||
}
|
||||
|
@ -724,6 +736,17 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
val.Free()
|
||||
return nil, err
|
||||
}
|
||||
askedProperties := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
|
||||
opts := gorocksdb.NewDefaultReadOptions()
|
||||
defer opts.Destroy()
|
||||
propertiesValue, err := rmq.store.GetBytes(opts, []byte(askedProperties))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var properties map[string]string
|
||||
if err = json.Unmarshal(propertiesValue, &properties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg := ConsumerMessage{
|
||||
MsgID: msgID,
|
||||
}
|
||||
|
@ -731,8 +754,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
dataLen := len(origData)
|
||||
if dataLen == 0 {
|
||||
msg.Payload = nil
|
||||
msg.Properties = nil
|
||||
} else {
|
||||
msg.Payload = make([]byte, dataLen)
|
||||
msg.Properties = properties
|
||||
copy(msg.Payload, origData)
|
||||
}
|
||||
consumerMessage = append(consumerMessage, msg)
|
||||
|
@ -850,7 +875,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
|||
return nil
|
||||
}
|
||||
|
||||
//Only for test
|
||||
// Only for test
|
||||
func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID) error {
|
||||
log.Warn("Use method ForceSeek that only for test")
|
||||
if rmq.isClosed() {
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
|
@ -151,14 +152,14 @@ func TestRocksmq_Basic(t *testing.T) {
|
|||
|
||||
msgA := "a_message"
|
||||
pMsgs := make([]ProducerMessage, 1)
|
||||
pMsgA := ProducerMessage{Payload: []byte(msgA)}
|
||||
pMsgA := ProducerMessage{Payload: []byte(msgA), Properties: map[string]string{common.TraceIDKey: "a"}}
|
||||
pMsgs[0] = pMsgA
|
||||
|
||||
_, err = rmq.Produce(channelName, pMsgs)
|
||||
assert.Nil(t, err)
|
||||
|
||||
pMsgB := ProducerMessage{Payload: []byte("b_message")}
|
||||
pMsgC := ProducerMessage{Payload: []byte("c_message")}
|
||||
pMsgB := ProducerMessage{Payload: []byte("b_message"), Properties: map[string]string{common.TraceIDKey: "b"}}
|
||||
pMsgC := ProducerMessage{Payload: []byte("c_message"), Properties: map[string]string{common.TraceIDKey: "c"}}
|
||||
|
||||
pMsgs[0] = pMsgB
|
||||
pMsgs = append(pMsgs, pMsgC)
|
||||
|
@ -176,12 +177,21 @@ func TestRocksmq_Basic(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(cMsgs), 1)
|
||||
assert.Equal(t, string(cMsgs[0].Payload), "a_message")
|
||||
_, ok := cMsgs[0].Properties[common.TraceIDKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "a")
|
||||
|
||||
cMsgs, err = rmq.Consume(channelName, groupName, 2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(cMsgs), 2)
|
||||
assert.Equal(t, string(cMsgs[0].Payload), "b_message")
|
||||
_, ok = cMsgs[0].Properties[common.TraceIDKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "b")
|
||||
assert.Equal(t, string(cMsgs[1].Payload), "c_message")
|
||||
_, ok = cMsgs[1].Properties[common.TraceIDKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c")
|
||||
}
|
||||
|
||||
func TestRocksmq_MultiConsumer(t *testing.T) {
|
||||
|
@ -509,15 +519,17 @@ func TestRocksmq_Goroutines(t *testing.T) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
/**
|
||||
This test is aim to measure RocksMq throughout.
|
||||
Hardware:
|
||||
CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
|
||||
Disk SSD
|
||||
/*
|
||||
*
|
||||
|
||||
Test with 1,000,000 message, result is as follow:
|
||||
Produce: 190000 message / s
|
||||
Consume: 90000 message / s
|
||||
This test is aim to measure RocksMq throughout.
|
||||
Hardware:
|
||||
CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
|
||||
Disk SSD
|
||||
|
||||
Test with 1,000,000 message, result is as follow:
|
||||
Produce: 190000 message / s
|
||||
Consume: 90000 message / s
|
||||
*/
|
||||
func TestRocksmq_Throughout(t *testing.T) {
|
||||
ep := etcdEndpoints()
|
||||
|
|
|
@ -275,7 +275,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
|||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
if _, err := ms.producers[channel].Send(
|
||||
|
@ -341,7 +341,7 @@ func (ms *mqMsgStream) ProduceMark(msgPack *MsgPack) (map[string][]MessageID, er
|
|||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
id, err := ms.producers[channel].Send(
|
||||
|
@ -384,7 +384,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
|
|||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
for _, producer := range ms.producers {
|
||||
|
@ -426,7 +426,7 @@ func (ms *mqMsgStream) BroadcastMark(msgPack *MsgPack) (map[string][]MessageID,
|
|||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
for channel, producer := range ms.producers {
|
||||
|
@ -504,7 +504,7 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) {
|
|||
Timestamp: tsMsg.BeginTs(),
|
||||
})
|
||||
|
||||
sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
|
||||
sp, ok := ExtractFromMsgProperties(tsMsg, msg.Properties())
|
||||
if ok {
|
||||
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
|
||||
}
|
||||
|
@ -810,7 +810,7 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
|
|||
continue
|
||||
}
|
||||
|
||||
sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
|
||||
sp, ok := ExtractFromMsgProperties(tsMsg, msg.Properties())
|
||||
if ok {
|
||||
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
|
||||
}
|
||||
|
|
|
@ -172,7 +172,9 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
|
|||
rand.Seed(time.Now().UnixNano())
|
||||
topic := fmt.Sprintf("test-topic-%d", rand.Int())
|
||||
subName := fmt.Sprintf("test-subname-%d", rand.Int())
|
||||
arr := []int{111, 222, 333, 444, 555, 666, 777}
|
||||
arr1 := []int{111, 222, 333, 444, 555, 666, 777}
|
||||
arr2 := []string{"111", "222", "333", "444", "555", "666", "777"}
|
||||
|
||||
c := make(chan mqwrapper.MessageID, 1)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -183,7 +185,7 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
|
|||
|
||||
producer := createProducer(t, kc, topic)
|
||||
defer producer.Close()
|
||||
produceData(ctx, t, producer, arr)
|
||||
produceData(ctx, t, producer, arr1, arr2)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second)
|
||||
|
@ -203,9 +205,9 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
|
|||
cancel3()
|
||||
|
||||
cancel()
|
||||
assert.Equal(t, len(arr), total1+total2)
|
||||
assert.Equal(t, len(arr1), total1+total2)
|
||||
|
||||
assert.Equal(t, len(arr), total3)
|
||||
assert.Equal(t, len(arr1), total3)
|
||||
}
|
||||
|
||||
func TestKafkaClient_SeekPosition(t *testing.T) {
|
||||
|
@ -220,8 +222,9 @@ func TestKafkaClient_SeekPosition(t *testing.T) {
|
|||
producer := createProducer(t, kc, topic)
|
||||
defer producer.Close()
|
||||
|
||||
data := []int{1, 2, 3}
|
||||
ids := produceData(ctx, t, producer, data)
|
||||
data1 := []int{1, 2, 3}
|
||||
data2 := []string{"1", "2", "3"}
|
||||
ids := produceData(ctx, t, producer, data1, data2)
|
||||
|
||||
consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionUnknown)
|
||||
defer consumer.Close()
|
||||
|
@ -233,6 +236,7 @@ func TestKafkaClient_SeekPosition(t *testing.T) {
|
|||
case msg := <-consumer.Chan():
|
||||
consumer.Ack(msg)
|
||||
assert.Equal(t, 3, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "3", msg.Properties()[common.TraceIDKey])
|
||||
case <-time.After(10 * time.Second):
|
||||
assert.FailNow(t, "should not wait")
|
||||
}
|
||||
|
@ -250,22 +254,25 @@ func TestKafkaClient_ConsumeFromLatest(t *testing.T) {
|
|||
producer := createProducer(t, kc, topic)
|
||||
defer producer.Close()
|
||||
|
||||
data := []int{1, 2}
|
||||
produceData(ctx, t, producer, data)
|
||||
data1 := []int{1, 2}
|
||||
data2 := []string{"1", "2"}
|
||||
produceData(ctx, t, producer, data1, data2)
|
||||
|
||||
consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest)
|
||||
defer consumer.Close()
|
||||
|
||||
go func() {
|
||||
time.Sleep(time.Second * 2)
|
||||
data := []int{3}
|
||||
produceData(ctx, t, producer, data)
|
||||
data1 := []int{3}
|
||||
data2 := []string{"3"}
|
||||
produceData(ctx, t, producer, data1, data2)
|
||||
}()
|
||||
|
||||
select {
|
||||
case msg := <-consumer.Chan():
|
||||
consumer.Ack(msg)
|
||||
assert.Equal(t, 3, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "3", msg.Properties()[common.TraceIDKey])
|
||||
case <-time.After(5 * time.Second):
|
||||
assert.FailNow(t, "should not wait")
|
||||
}
|
||||
|
@ -380,12 +387,14 @@ func createProducer(t *testing.T, kc *kafkaClient, topic string) mqwrapper.Produ
|
|||
return producer
|
||||
}
|
||||
|
||||
func produceData(ctx context.Context, t *testing.T, producer mqwrapper.Producer, arr []int) []mqwrapper.MessageID {
|
||||
func produceData(ctx context.Context, t *testing.T, producer mqwrapper.Producer, arr []int, pArr []string) []mqwrapper.MessageID {
|
||||
var msgIDs []mqwrapper.MessageID
|
||||
for _, v := range arr {
|
||||
for k, v := range arr {
|
||||
msg := &mqwrapper.ProducerMessage{
|
||||
Payload: IntToBytes(v),
|
||||
Properties: map[string]string{},
|
||||
Payload: IntToBytes(v),
|
||||
Properties: map[string]string{
|
||||
common.TraceIDKey: pArr[k],
|
||||
},
|
||||
}
|
||||
msgID, err := producer.Send(ctx, msg)
|
||||
msgIDs = append(msgIDs, msgID)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -34,8 +35,9 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111, 222, 333}
|
||||
data2 := []string{"111", "222", "333"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
msgID := &kafkaID{messageID: 1}
|
||||
err = consumer.Seek(msgID, false)
|
||||
|
@ -43,6 +45,7 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) {
|
|||
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 333, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "333", msg.Properties()[common.TraceIDKey])
|
||||
assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID)
|
||||
assert.Equal(t, topic, msg.Topic())
|
||||
assert.True(t, len(msg.Properties()) == 0)
|
||||
|
@ -58,8 +61,9 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111, 222, 333}
|
||||
data2 := []string{"111", "222", "333"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
msgID := &kafkaID{messageID: 1}
|
||||
err = consumer.Seek(msgID, true)
|
||||
|
@ -67,6 +71,7 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) {
|
|||
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 222, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "222", msg.Properties()[common.TraceIDKey])
|
||||
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
|
||||
assert.Equal(t, topic, msg.Topic())
|
||||
assert.True(t, len(msg.Properties()) == 0)
|
||||
|
@ -99,8 +104,9 @@ func TestKafkaConsumer_ChanWithNoAssign(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
data := []int{111}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111}
|
||||
data2 := []string{"111"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
assert.Panics(t, func() {
|
||||
<-consumer.Chan()
|
||||
})
|
||||
|
@ -135,10 +141,12 @@ func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
data := []int{111}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111}
|
||||
data2 := []string{"111"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "111", msg.Properties()[common.TraceIDKey])
|
||||
|
||||
err = consumer.Seek(mockMsgID{}, false)
|
||||
assert.Error(t, err)
|
||||
|
@ -158,8 +166,9 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
|
|||
assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111, 222, 333}
|
||||
data2 := []string{"111", "222", "333"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
latestMsgID, err = consumer.GetLatestMsgID()
|
||||
assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID)
|
||||
|
@ -171,20 +180,24 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
|
|||
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
||||
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111, 222, 333}
|
||||
data2 := []string{"111", "222", "333"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
config := createConfig(groupID)
|
||||
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionLatest)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
data = []int{444, 555}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 = []int{444, 555}
|
||||
data2 = []string{"444", "555"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 444, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "444", msg.Properties()[common.TraceIDKey])
|
||||
msg = <-consumer.Chan()
|
||||
assert.Equal(t, 555, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "555", msg.Properties()[common.TraceIDKey])
|
||||
}
|
||||
|
||||
func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) {
|
||||
|
@ -192,14 +205,16 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) {
|
|||
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
||||
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
data1 := []int{111, 222, 333}
|
||||
data2 := []string{"111", "222", "333"}
|
||||
testKafkaConsumerProduceData(t, topic, data1, data2)
|
||||
|
||||
config := createConfig(groupID)
|
||||
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
|
||||
assert.NoError(t, err)
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "111", msg.Properties()[common.TraceIDKey])
|
||||
consumer.Ack(msg)
|
||||
defer consumer.Close()
|
||||
|
||||
|
@ -208,6 +223,7 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
msg = <-consumer2.Chan()
|
||||
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "111", msg.Properties()[common.TraceIDKey])
|
||||
consumer2.Ack(msg)
|
||||
defer consumer2.Close()
|
||||
}
|
||||
|
@ -218,14 +234,14 @@ func TestKafkaConsumer_createKafkaConsumer(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
|
||||
func testKafkaConsumerProduceData(t *testing.T, topic string, data []int, arr []string) {
|
||||
ctx := context.Background()
|
||||
kc := createKafkaClient(t)
|
||||
defer kc.Close()
|
||||
producer := createProducer(t, kc, topic)
|
||||
defer producer.Close()
|
||||
|
||||
produceData(ctx, t, producer, data)
|
||||
produceData(ctx, t, producer, data, arr)
|
||||
|
||||
producer.(*kafkaProducer).p.Flush(500)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package kafka
|
|||
|
||||
import (
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
|
||||
)
|
||||
|
||||
|
@ -14,7 +15,17 @@ func (km *kafkaMessage) Topic() string {
|
|||
}
|
||||
|
||||
func (km *kafkaMessage) Properties() map[string]string {
|
||||
return nil
|
||||
if len(km.msg.Headers) == 0 {
|
||||
return nil
|
||||
}
|
||||
var properties map[string]string
|
||||
for i := 0; i < len(km.msg.Headers); i++ {
|
||||
if _, ok := properties[km.msg.Headers[i].Key]; ok {
|
||||
log.Info("Repeated key in kafka message headers")
|
||||
}
|
||||
properties[km.msg.Headers[i].Key] = string(km.msg.Headers[i].Value)
|
||||
}
|
||||
return properties
|
||||
}
|
||||
|
||||
func (km *kafkaMessage) Payload() []byte {
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
|
||||
func TestKafkaMessage_All(t *testing.T) {
|
||||
topic := "t"
|
||||
msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: 0}, Value: nil}
|
||||
msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: 0}, Value: nil, Headers: nil}
|
||||
km := &kafkaMessage{msg: msg}
|
||||
assert.Equal(t, topic, km.Topic())
|
||||
assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID)
|
||||
|
|
|
@ -26,9 +26,15 @@ func (kp *kafkaProducer) Topic() string {
|
|||
}
|
||||
|
||||
func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
|
||||
var headers []kafka.Header
|
||||
for key, value := range message.Properties {
|
||||
header := kafka.Header{Key: key, Value: []byte(value)}
|
||||
headers = append(headers, header)
|
||||
}
|
||||
err := kp.p.Produce(&kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
|
||||
Value: message.Payload,
|
||||
Headers: headers,
|
||||
}, kp.deliveryChan)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -425,11 +425,14 @@ func TestPulsarClient_SeekPosition(t *testing.T) {
|
|||
|
||||
log.Info("Produce start")
|
||||
ids := []mqwrapper.MessageID{}
|
||||
arr := []int{1, 2, 3}
|
||||
for _, v := range arr {
|
||||
arr1 := []int{1, 2, 3}
|
||||
arr2 := []string{"1", "2", "3"}
|
||||
for k, v := range arr1 {
|
||||
msg := &mqwrapper.ProducerMessage{
|
||||
Payload: IntToBytes(v),
|
||||
Properties: map[string]string{},
|
||||
Payload: IntToBytes(v),
|
||||
Properties: map[string]string{
|
||||
common.TraceIDKey: arr2[k],
|
||||
},
|
||||
}
|
||||
id, err := producer.Send(ctx, msg)
|
||||
ids = append(ids, id)
|
||||
|
@ -459,6 +462,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) {
|
|||
assert.Equal(t, seekID.EntryID(), msg.ID().EntryID())
|
||||
assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx())
|
||||
assert.Equal(t, 3, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "3", msg.Properties()[common.TraceIDKey])
|
||||
case <-time.After(2 * time.Second):
|
||||
assert.FailNow(t, "should not wait")
|
||||
}
|
||||
|
@ -475,6 +479,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) {
|
|||
assert.Equal(t, seekID.EntryID(), msg.ID().EntryID())
|
||||
assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx())
|
||||
assert.Equal(t, 2, BytesToInt(msg.Payload()))
|
||||
assert.Equal(t, "2", msg.Properties()[common.TraceIDKey])
|
||||
case <-time.After(2 * time.Second):
|
||||
assert.FailNow(t, "should not wait")
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
|
||||
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
|
||||
|
||||
|
@ -86,6 +87,7 @@ func TestComsumeCompressedMessage(t *testing.T) {
|
|||
|
||||
msg := []byte("test message")
|
||||
compressedMsg := []byte("test compressed message")
|
||||
traceValue := "test compressed message id"
|
||||
_, err = producer.Send(context.Background(), &mqwrapper.ProducerMessage{
|
||||
Payload: msg,
|
||||
Properties: map[string]string{},
|
||||
|
@ -97,14 +99,17 @@ func TestComsumeCompressedMessage(t *testing.T) {
|
|||
assert.Equal(t, msg, recvMsg.Payload())
|
||||
|
||||
_, err = compressProducer.Send(context.Background(), &mqwrapper.ProducerMessage{
|
||||
Payload: compressedMsg,
|
||||
Properties: map[string]string{},
|
||||
Payload: compressedMsg,
|
||||
Properties: map[string]string{
|
||||
common.TraceIDKey: traceValue,
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
recvMsg, err = consumer.Receive(context.Background())
|
||||
assert.NoError(t, err)
|
||||
consumer.Ack(recvMsg)
|
||||
assert.Equal(t, compressedMsg, recvMsg.Payload())
|
||||
assert.Equal(t, traceValue, recvMsg.Properties()[common.TraceIDKey])
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, consumer)
|
||||
|
|
|
@ -31,7 +31,7 @@ func (rm *rmqMessage) Topic() string {
|
|||
|
||||
// Properties returns the properties of rocksmq message
|
||||
func (rm *rmqMessage) Properties() map[string]string {
|
||||
return nil
|
||||
return rm.msg.Properties
|
||||
}
|
||||
|
||||
// Payload returns the payload of rocksmq message
|
||||
|
|
|
@ -33,7 +33,7 @@ func (rp *rmqProducer) Topic() string {
|
|||
|
||||
// Send send the producer messages to rocksmq
|
||||
func (rp *rmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
|
||||
pm := &client.ProducerMessage{Payload: message.Payload}
|
||||
pm := &client.ProducerMessage{Payload: message.Payload, Properties: message.Properties}
|
||||
id, err := rp.p.Send(pm)
|
||||
return &rmqID{messageID: id}, err
|
||||
}
|
||||
|
|
|
@ -29,9 +29,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
)
|
||||
|
||||
// ExtractFromPulsarMsgProperties extracts trace span from msg.properties.
|
||||
// ExtractFromMsgProperties extracts trace span from msg.properties.
|
||||
// And it will attach some default tags to the span.
|
||||
func ExtractFromPulsarMsgProperties(msg TsMsg, properties map[string]string) (opentracing.Span, bool) {
|
||||
func ExtractFromMsgProperties(msg TsMsg, properties map[string]string) (opentracing.Span, bool) {
|
||||
if !allowTrace(msg) {
|
||||
return trace.NoopSpan(), false
|
||||
}
|
||||
|
|
|
@ -186,13 +186,13 @@ func InfoFromContext(ctx context.Context) (traceID string, sampled, found bool)
|
|||
return "", false, false
|
||||
}
|
||||
|
||||
// InjectContextToPulsarMsgProperties is a method inject span to pulsr message.
|
||||
func InjectContextToPulsarMsgProperties(sc opentracing.SpanContext, properties map[string]string) {
|
||||
// InjectContextToMsgProperties is a method inject span to pulsr message.
|
||||
func InjectContextToMsgProperties(sc opentracing.SpanContext, properties map[string]string) {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
tracer.Inject(sc, opentracing.TextMap, PropertiesReaderWriter{properties})
|
||||
}
|
||||
|
||||
// PropertiesReaderWriter is for saving trce in pulsar msg properties.
|
||||
// PropertiesReaderWriter is for saving trace in pulsar msg properties.
|
||||
// Implement Set and ForeachKey methods.
|
||||
type PropertiesReaderWriter struct {
|
||||
PpMap map[string]string
|
||||
|
|
|
@ -100,7 +100,7 @@ func TestInject(t *testing.T) {
|
|||
id, sampled, found := InfoFromContext(ctx)
|
||||
fmt.Printf("traceID = %s, sampled = %t, found = %t", id, sampled, found)
|
||||
pp := PropertiesReaderWriter{PpMap: map[string]string{}}
|
||||
InjectContextToPulsarMsgProperties(sp.Context(), pp.PpMap)
|
||||
InjectContextToMsgProperties(sp.Context(), pp.PpMap)
|
||||
tracer := opentracing.GlobalTracer()
|
||||
sc, _ := tracer.Extract(opentracing.TextMap, pp)
|
||||
assert.NotNil(t, sc)
|
||||
|
|
Loading…
Reference in New Issue