Add MessageID return value in producers (#8586)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/8598/head
congqixia 2021-09-26 17:38:07 +08:00 committed by GitHub
parent cbe8c03224
commit 353eeab8b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 68 additions and 48 deletions

View File

@ -246,7 +246,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
ms.producerLock.Lock()
if err := ms.producers[channel].Send(
if _, err := ms.producers[channel].Send(
spanCtx,
msg,
); err != nil {
@ -286,7 +286,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
ms.producerLock.Lock()
for _, producer := range ms.producers {
if err := producer.Send(
if _, err := producer.Send(
spanCtx,
msg,
); err != nil {

View File

@ -28,7 +28,7 @@ type Producer interface {
//Topic() string
// publish a message
Send(ctx context.Context, message *ProducerMessage) error
Send(ctx context.Context, message *ProducerMessage) (MessageID, error)
Close()
}

View File

@ -53,7 +53,7 @@ func Produce(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
Payload: IntToBytes(v),
Properties: map[string]string{},
}
err = producer.Send(ctx, msg)
_, err = producer.Send(ctx, msg)
assert.Nil(t, err)
log.Info("Pub", zap.Any("SND", v))
}

View File

@ -17,6 +17,9 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)
// implementation assertion
var _ Producer = (*pulsarProducer)(nil)
type pulsarProducer struct {
p pulsar.Producer
}
@ -25,10 +28,10 @@ func (pp *pulsarProducer) Topic() string {
return pp.p.Topic()
}
func (pp *pulsarProducer) Send(ctx context.Context, message *ProducerMessage) error {
func (pp *pulsarProducer) Send(ctx context.Context, message *ProducerMessage) (MessageID, error) {
ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties}
_, err := pp.p.Send(ctx, ppm)
return err
pmID, err := pp.p.Send(ctx, ppm)
return &pulsarID{messageID: pmID}, err
}
func (pp *pulsarProducer) Close() {

View File

@ -38,7 +38,7 @@ func TestPulsarProducer(t *testing.T) {
Payload: []byte{},
Properties: map[string]string{},
}
err = producer.Send(context.TODO(), msg)
_, err = producer.Send(context.TODO(), msg)
assert.Nil(t, err)
pulsarProd.Close()

View File

@ -69,7 +69,7 @@ func TestRmqClient_CreateProducer(t *testing.T) {
Payload: []byte{},
Properties: nil,
}
err = rmqProducer.Send(context.TODO(), msg)
_, err = rmqProducer.Send(context.TODO(), msg)
assert.Nil(t, err)
invalidOpts := ProducerOptions{Topic: ""}
@ -114,7 +114,7 @@ func TestRmqClient_Subscribe(t *testing.T) {
Payload: []byte{1},
Properties: nil,
}
err = producer.Send(context.TODO(), msg)
_, err = producer.Send(context.TODO(), msg)
assert.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)

View File

@ -17,6 +17,8 @@ import (
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
)
var _ Producer = (*rmqProducer)(nil)
type rmqProducer struct {
p rocksmq.Producer
}
@ -25,9 +27,10 @@ func (rp *rmqProducer) Topic() string {
return rp.p.Topic()
}
func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) error {
func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) (MessageID, error) {
pm := &rocksmq.ProducerMessage{Payload: message.Payload}
return rp.p.Send(pm)
id, err := rp.p.Send(pm)
return &rmqID{messageID: id}, err
}
func (rp *rmqProducer) Close() {

View File

@ -27,7 +27,7 @@ type Producer interface {
Topic() string
// publish a message
Send(message *ProducerMessage) error
Send(message *ProducerMessage) (UniqueID, error)
// Close a producer
Close()

View File

@ -17,6 +17,9 @@ import (
"go.uber.org/zap"
)
// assertion make sure implementation
var _ Producer = (*producer)(nil)
type producer struct {
// client which the producer belong to
c *client
@ -42,12 +45,16 @@ func (p *producer) Topic() string {
return p.topic
}
func (p *producer) Send(message *ProducerMessage) error {
return p.c.server.Produce(p.topic, []server.ProducerMessage{
func (p *producer) Send(message *ProducerMessage) (UniqueID, error) {
ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{
{
Payload: message.Payload,
},
})
if err != nil {
return 0, err
}
return ids[0], nil
}
func (p *producer) Close() {

View File

@ -40,7 +40,7 @@ type RocksMQ interface {
RegisterConsumer(consumer *Consumer)
Produce(topicName string, messages []ProducerMessage) error
Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error)
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
Seek(topicName string, groupName string, msgID UniqueID) error
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)

View File

@ -382,14 +382,14 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
return nil
}
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error {
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
@ -399,11 +399,11 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
if err != nil {
log.Debug("RocksMQ: alloc id failed.")
return err
return []UniqueID{}, err
}
if UniqueID(msgLen) != idEnd-idStart {
return errors.New("Obtained id length is not equal that of message")
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
/* Step I: Insert data to store system */
@ -415,7 +415,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
msgID := idStart + UniqueID(i)
key, err := combKey(topicName, msgID)
if err != nil {
return err
return []UniqueID{}, err
}
batch.Put([]byte(key), messages[i].Payload)
@ -428,7 +428,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
err = rmq.store.Write(opts, batch)
if err != nil {
log.Debug("RocksMQ: write batch failed")
return err
return []UniqueID{}, err
}
/* Step II: Update meta data to kv system */
@ -436,7 +436,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
beginIDValue, err := rmq.kv.Load(kvChannelBeginID)
if err != nil {
log.Debug("RocksMQ: load " + kvChannelBeginID + " failed")
return err
return []UniqueID{}, err
}
kvValues := make(map[string]string)
@ -452,7 +452,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
err = rmq.kv.MultiSave(kvValues)
if err != nil {
log.Debug("RocksMQ: multisave failed")
return err
return []UniqueID{}, err
}
if vals, ok := rmq.consumers.Load(topicName); ok {
@ -470,9 +470,9 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
// TODO(yukun): Should this be in a go routine
err = rmq.UpdatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
return err
return []UniqueID{}, err
}
return nil
return msgIDs, nil
}
func (rmq *rocksmq) UpdatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {

View File

@ -99,7 +99,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
pMsgs[0] = pMsgA
_ = idAllocator.UpdateID()
err = rmq.Produce(topicName, pMsgs)
_, err = rmq.Produce(topicName, pMsgs)
assert.Error(t, err)
rmq.Notify(topicName, groupName)
@ -152,7 +152,7 @@ func TestRocksmq(t *testing.T) {
pMsgs[0] = pMsgA
_ = idAllocator.UpdateID()
err = rmq.Produce(channelName, pMsgs)
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
pMsgB := ProducerMessage{Payload: []byte("b_message")}
@ -161,7 +161,7 @@ func TestRocksmq(t *testing.T) {
pMsgs[0] = pMsgB
pMsgs = append(pMsgs, pMsgC)
_ = idAllocator.UpdateID()
err = rmq.Produce(channelName, pMsgs)
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
groupName := "test_group"
@ -212,10 +212,10 @@ func TestRocksmq_Dummy(t *testing.T) {
err = rmq.DestroyConsumerGroup(channelName, channelName1)
assert.NoError(t, err)
err = rmq.Produce(channelName, nil)
_, err = rmq.Produce(channelName, nil)
assert.Error(t, err)
err = rmq.Produce(channelName1, nil)
_, err = rmq.Produce(channelName1, nil)
assert.Error(t, err)
groupName1 := "group_dummy"
@ -226,7 +226,7 @@ func TestRocksmq_Dummy(t *testing.T) {
channelName2 := strings.Repeat(channelName1, 100)
err = rmq.CreateTopic(string(channelName2))
assert.NoError(t, err)
err = rmq.Produce(string(channelName2), nil)
_, err = rmq.Produce(string(channelName2), nil)
assert.Error(t, err)
msgA := "a_message"
@ -238,7 +238,8 @@ func TestRocksmq_Dummy(t *testing.T) {
_, err = rmq.Consume(channelName, groupName1, 1)
assert.Error(t, err)
topicMu.Store(channelName, channelName)
assert.Error(t, rmq.Produce(channelName, nil))
_, err = rmq.Produce(channelName, nil)
assert.Error(t, err)
_, err = rmq.Consume(channelName, groupName1, 1)
assert.Error(t, err)
@ -275,7 +276,7 @@ func TestRocksmq_Loop(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs := make([]ProducerMessage, 1)
pMsgs[0] = pMsg
err := rmq.Produce(channelName, pMsgs)
_, err := rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
}
@ -286,7 +287,7 @@ func TestRocksmq_Loop(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(channelName, pMsgs)
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
// Consume loopNum message once
@ -350,8 +351,9 @@ func TestRocksmq_Goroutines(t *testing.T) {
pMsgs[0] = pMsg0
pMsgs[1] = pMsg1
err := mq.Produce(channelName, pMsgs)
ids, err := mq.Produce(channelName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
msgChan <- msg0
msgChan <- msg1
}(i, rmq)
@ -413,8 +415,9 @@ func TestRocksmq_Throughout(t *testing.T) {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
assert.Nil(t, idAllocator.UpdateID())
err := rmq.Produce(channelName, []ProducerMessage{pMsg})
ids, err := rmq.Produce(channelName, []ProducerMessage{pMsg})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
}
pt1 := time.Now().UnixNano() / int64(time.Millisecond)
pDuration := pt1 - pt0
@ -471,9 +474,9 @@ func TestRocksmq_MultiChan(t *testing.T) {
msg1 := "for_chann1_" + strconv.Itoa(i)
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
pMsg1 := ProducerMessage{Payload: []byte(msg1)}
err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
_, err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
assert.Nil(t, err)
err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
assert.Nil(t, err)
}
@ -516,20 +519,20 @@ func TestRocksmq_CopyData(t *testing.T) {
msg0 := "abcde"
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
_, err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
assert.Nil(t, err)
pMsg1 := ProducerMessage{Payload: nil}
err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
assert.Nil(t, err)
pMsg2 := ProducerMessage{Payload: []byte{}}
err = rmq.Produce(channelName1, []ProducerMessage{pMsg2})
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg2})
assert.Nil(t, err)
var emptyTargetData []byte
pMsg3 := ProducerMessage{Payload: emptyTargetData}
err = rmq.Produce(channelName1, []ProducerMessage{pMsg3})
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg3})
assert.Nil(t, err)
groupName := "test_group"

View File

@ -73,8 +73,9 @@ func TestRmqRetention(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
@ -159,8 +160,9 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
var wg sync.WaitGroup
wg.Add(1)
@ -321,8 +323,9 @@ func TestRmqRetention_Complex(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
@ -385,8 +388,9 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
ids, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids))
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)