enhance: enable rmq for streaming (#38669)

issue: #38399

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/38726/head
Zhen Ye 2024-12-24 20:24:48 +08:00 committed by GitHub
parent 2b53b0905e
commit 69a9fd6ead
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 207 additions and 37 deletions

View File

@ -78,6 +78,8 @@ type Scanner interface {
// WALAccesser is the interfaces to interact with the milvus write ahead log.
type WALAccesser interface {
WALName() string
// Txn returns a transaction for writing records to the log.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
Txn(ctx context.Context, opts TxnOption) (Txn, error)

View File

@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
@ -54,6 +55,10 @@ type walAccesserImpl struct {
dispatchExecutionPool *conc.Pool[struct{}]
}
func (w *walAccesserImpl) WALName() string {
return util.MustSelectWALName()
}
// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)

View File

@ -331,6 +331,51 @@ func (_c *MockWALAccesser_Txn_Call) RunAndReturn(run func(context.Context, strea
return _c
}
// WALName provides a mock function with given fields:
func (_m *MockWALAccesser) WALName() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for WALName")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockWALAccesser_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName'
type MockWALAccesser_WALName_Call struct {
*mock.Call
}
// WALName is a helper method to define mock.On call
func (_e *MockWALAccesser_Expecter) WALName() *MockWALAccesser_WALName_Call {
return &MockWALAccesser_WALName_Call{Call: _e.mock.On("WALName")}
}
func (_c *MockWALAccesser_WALName_Call) Run(run func()) *MockWALAccesser_WALName_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWALAccesser_WALName_Call) Return(_a0 string) *MockWALAccesser_WALName_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWALAccesser_WALName_Call) RunAndReturn(run func() string) *MockWALAccesser_WALName_Call {
_c.Call.Return(run)
return _c
}
// NewMockWALAccesser creates a new instance of MockWALAccesser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockWALAccesser(t interface {

View File

@ -735,7 +735,7 @@ func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Con
s := streaming.WAL().Read(ctx, streaming.ReadOption{
VChannel: position.GetChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(
adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()),
adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()),
),
DeliverFilters: []options.DeliverFilter{
// only deliver message which timestamp >= position.Timestamp

View File

@ -95,7 +95,7 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
}
if streamingutil.IsStreamingServiceEnabled() {
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID())
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID())
log.Info(
"stream pipeline seeks from position with scanner",
zap.String("channel", position.GetChannelName()),

View File

@ -2,14 +2,13 @@ package util
import (
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
walTypeDefault = "default"
walTypeNatsmq = "natsmq"
walTypeRocksmq = "rocksmq"
walTypeKafka = "kafka"
walTypePulsar = "pulsar"
@ -17,25 +16,16 @@ const (
type walEnable struct {
Rocksmq bool
Natsmq bool
Pulsar bool
Kafka bool
}
var isStandAlone = atomic.NewBool(false)
// EnableStandAlone enable standalone mode.
func EnableStandAlone(standalone bool) {
isStandAlone.Store(standalone)
}
// MustSelectWALName select wal name.
func MustSelectWALName() string {
standalone := isStandAlone.Load()
params := paramtable.Get()
standalone := params.RuntimeConfig.Role.GetAsString() == typeutil.StandaloneRole
return mustSelectWALName(standalone, params.MQCfg.Type.GetValue(), walEnable{
params.RocksmqEnable(),
params.NatsmqEnable(),
params.PulsarEnable(),
params.KafkaEnable(),
})
@ -68,8 +58,8 @@ func validateWALName(standalone bool, mqType string) error {
// we may register more mq type by plugin.
// so we should not check all mq type here.
// only check standalone type.
if !standalone && (mqType == walTypeRocksmq || mqType == walTypeNatsmq) {
return errors.Newf("mq %s is only valid in standalone mode")
if !standalone && mqType == walTypeRocksmq {
return errors.Newf("mq %s is only valid in standalone mode", mqType)
}
return nil
}

View File

@ -7,27 +7,24 @@ import (
)
func TestValidateWALType(t *testing.T) {
assert.Error(t, validateWALName(false, walTypeNatsmq))
assert.Error(t, validateWALName(false, walTypeRocksmq))
}
func TestSelectWALType(t *testing.T) {
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeNatsmq, walEnable{true, true, true, true}), walTypeNatsmq)
assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) })
assert.Panics(t, func() { mustSelectWALName(false, walTypeNatsmq, walEnable{true, true, true, true}) })
assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false}) })
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false}) })
assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true}) })
assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true}), walTypeKafka)
}

View File

@ -15,6 +15,7 @@ import (
"context"
"reflect"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -139,6 +140,7 @@ func (c *client) consume(consumer *consumer) {
var consumerCh chan<- common.Message
var waitForSent *RmqMessage
var newIncomingMsgCh <-chan struct{}
var timerNotify <-chan time.Time
if len(pendingMsgs) > 0 {
// If there's pending sent messages, we can try to deliver them first.
consumerCh = consumer.messageCh
@ -148,6 +150,9 @@ func (c *client) consume(consumer *consumer) {
// !!! TODO: MsgMutex may lost, not sync up with the consumer,
// so the tailing message cannot be consumed if no new producing message.
newIncomingMsgCh = consumer.MsgMutex()
// It's a bad implementation here, for quickly fixing the previous problem.
// Every 100ms, wake up and check if the consumer has new incoming data.
timerNotify = time.After(100 * time.Millisecond)
}
select {
@ -162,6 +167,8 @@ func (c *client) consume(consumer *consumer) {
log.Info("Consumer MsgMutex closed")
return
}
case <-timerNotify:
continue
}
}
}
@ -191,6 +198,17 @@ func (c *client) tryToConsume(consumer *consumer) []*RmqMessage {
}
rmqMsgs := make([]*RmqMessage, 0, len(msgs))
for _, msg := range msgs {
rmqMsg, err := unmarshalStreamingMessage(consumer.topic, msg)
if err == nil {
rmqMsgs = append(rmqMsgs, rmqMsg)
continue
}
if !errors.Is(err, errNotStreamingServiceMessage) {
log.Warn("Consumer's goroutine cannot unmarshal streaming message: ", zap.Error(err))
continue
}
// then fallback to the legacy message format.
// This is the hack, we put property into pl
properties := make(map[string]string, 0)
pl, err := UnmarshalHeader(msg.Payload)

View File

@ -28,6 +28,9 @@ type Producer interface {
// publish a message
Send(message *common.ProducerMessage) (UniqueID, error)
// publish a message for new streaming service.
SendForStreamingService(message *common.ProducerMessage) (UniqueID, error)
// Close a producer
Close()
}

View File

@ -76,6 +76,20 @@ func (p *producer) Send(message *common.ProducerMessage) (UniqueID, error) {
return ids[0], nil
}
func (p *producer) SendForStreamingService(message *common.ProducerMessage) (UniqueID, error) {
payload, err := marshalStreamingMessage(message)
if err != nil {
return 0, err
}
ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{{
Payload: payload,
}})
if err != nil {
return 0, err
}
return ids[0], nil
}
// Close destroy the topic of this producer in rocksmq
func (p *producer) Close() {
err := p.c.server.DestroyTopic(p.topic)

View File

@ -0,0 +1,53 @@
package client
import (
"bytes"
"github.com/cockroachdb/errors"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
)
var (
// magicPrefix is used to identify the rocksmq legacy message and new message for streaming service.
// Make a low probability of collision with the legacy proto message.
magicPrefix = append([]byte{0xFF, 0xFE, 0xFD, 0xFC}, []byte("STREAM")...)
errNotStreamingServiceMessage = errors.New("not a streaming service message")
)
// marshalStreamingMessage marshals a streaming message to bytes.
func marshalStreamingMessage(message *common.ProducerMessage) ([]byte, error) {
rmqMessage := &messagespb.RMQMessageLayout{
Payload: message.Payload,
Properties: message.Properties,
}
payload, err := proto.Marshal(rmqMessage)
if err != nil {
return nil, err
}
finalPayload := make([]byte, len(payload)+len(magicPrefix))
copy(finalPayload, magicPrefix)
copy(finalPayload[len(magicPrefix):], payload)
return finalPayload, nil
}
// unmarshalStreamingMessage unmarshals a streaming message from bytes.
func unmarshalStreamingMessage(topic string, msg server.ConsumerMessage) (*RmqMessage, error) {
if !bytes.HasPrefix(msg.Payload, magicPrefix) {
return nil, errNotStreamingServiceMessage
}
var rmqMessage messagespb.RMQMessageLayout
if err := proto.Unmarshal(msg.Payload[len(magicPrefix):], &rmqMessage); err != nil {
return nil, err
}
return &RmqMessage{
msgID: msg.MsgID,
payload: rmqMessage.Payload,
properties: rmqMessage.Properties,
topic: topic,
}, nil
}

View File

@ -0,0 +1,36 @@
package client
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
)
func TestStreaming(t *testing.T) {
payload, err := marshalStreamingMessage(&common.ProducerMessage{
Payload: []byte("payload"),
Properties: map[string]string{
"key": "value",
},
})
assert.NoError(t, err)
assert.NotNil(t, payload)
msg, err := unmarshalStreamingMessage("topic", server.ConsumerMessage{
MsgID: 1,
Payload: payload,
})
assert.NoError(t, err)
assert.Equal(t, string(msg.Payload()), "payload")
assert.Equal(t, msg.Properties()["key"], "value")
msg, err = unmarshalStreamingMessage("topic", server.ConsumerMessage{
MsgID: 1,
Payload: payload[1:],
})
assert.Error(t, err)
assert.ErrorIs(t, err, errNotStreamingServiceMessage)
assert.Nil(t, msg)
}

View File

@ -242,3 +242,9 @@ enum TxnState {
// the transaction is rollbacked.
TxnRollbacked = 6;
}
// RMQMessageLayout is the layout of message for RMQ.
message RMQMessageLayout {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}

View File

@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -35,5 +36,5 @@ func TestRegistry(t *testing.T) {
}
func TestWAL(t *testing.T) {
// walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run()
walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run()
}

View File

@ -30,7 +30,7 @@ func (w *walImpl) WALName() string {
// Append appends a message to the wal.
func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
id, err := w.p.Send(&common.ProducerMessage{
id, err := w.p.SendForStreamingService(&common.ProducerMessage{
Payload: msg.Payload(),
Properties: msg.Properties().ToRawMap(),
})