mirror of https://github.com/milvus-io/milvus.git
Refine msgstream interface (#20832)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/20845/head
parent
548e90ec68
commit
039e9ce4bb
|
@ -492,7 +492,6 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
log.Info("DataCoord creates the timetick channel consumer",
|
||||
zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick),
|
||||
zap.String("subscription", subName))
|
||||
ttMsgStream.Start()
|
||||
|
||||
go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
|
||||
}
|
||||
|
|
|
@ -1487,7 +1487,6 @@ func TestDataNodeTtChannel(t *testing.T) {
|
|||
ttMsgStream, err := svr.factory.NewMsgStream(context.TODO())
|
||||
assert.Nil(t, err)
|
||||
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
info := &NodeInfo{
|
||||
Address: "localhost:7777",
|
||||
|
@ -1555,7 +1554,6 @@ func TestDataNodeTtChannel(t *testing.T) {
|
|||
ttMsgStream, err := svr.factory.NewMsgStream(context.TODO())
|
||||
assert.Nil(t, err)
|
||||
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
info := &NodeInfo{
|
||||
Address: "localhost:7777",
|
||||
|
@ -1637,7 +1635,6 @@ func TestDataNodeTtChannel(t *testing.T) {
|
|||
ttMsgStream, err := svr.factory.NewMsgStream(context.TODO())
|
||||
assert.Nil(t, err)
|
||||
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
node := &NodeInfo{
|
||||
NodeID: 0,
|
||||
|
|
|
@ -293,13 +293,12 @@ func TestDataNode(t *testing.T) {
|
|||
insertStream, err := factory.NewMsgStream(node1.ctx)
|
||||
assert.NoError(t, err)
|
||||
insertStream.AsProducer([]string{dmChannelName})
|
||||
insertStream.Start()
|
||||
defer insertStream.Close()
|
||||
|
||||
err = insertStream.Broadcast(&timeTickMsgPack)
|
||||
_, err = insertStream.Broadcast(&timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = insertStream.Broadcast(&timeTickMsgPack)
|
||||
_, err = insertStream.Broadcast(&timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
|
|
|
@ -304,17 +304,14 @@ func TestDataSyncService_Start(t *testing.T) {
|
|||
ddStream.AsProducer([]string{ddlChannelName})
|
||||
|
||||
var insertMsgStream msgstream.MsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
|
||||
var ddMsgStream msgstream.MsgStream = ddStream
|
||||
ddMsgStream.Start()
|
||||
|
||||
err = insertMsgStream.Produce(&msgPack)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = insertMsgStream.Broadcast(&timeTickMsgPack)
|
||||
_, err = insertMsgStream.Broadcast(&timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
err = ddMsgStream.Broadcast(&timeTickMsgPack)
|
||||
_, err = ddMsgStream.Broadcast(&timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// dataSync
|
||||
|
@ -487,10 +484,6 @@ func TestGetChannelLatestMsgID(t *testing.T) {
|
|||
|
||||
insertStream, _ := factory.NewMsgStream(ctx)
|
||||
insertStream.AsProducer([]string{dmlChannelName})
|
||||
|
||||
var insertMsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
|
||||
id, err := dsService.getChannelLatestMsgID(ctx, dmlChannelName, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, id)
|
||||
|
|
|
@ -369,7 +369,6 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe
|
|||
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
|
||||
log.Info("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
|
||||
var deltaMsgStream msgstream.MsgStream = deltaStream
|
||||
deltaMsgStream.Start()
|
||||
|
||||
dd := &ddNode{
|
||||
ctx: ctx,
|
||||
|
|
|
@ -65,7 +65,6 @@ func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([
|
|||
type mockTtMsgStream struct {
|
||||
}
|
||||
|
||||
func (mtm *mockTtMsgStream) Start() {}
|
||||
func (mtm *mockTtMsgStream) Close() {}
|
||||
func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
|
||||
return make(chan *msgstream.MsgPack, 100)
|
||||
|
@ -75,9 +74,6 @@ func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
|
|||
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
|
||||
}
|
||||
func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
|
||||
func (mtm *mockTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
|
||||
return make([][]int32, 0)
|
||||
}
|
||||
|
||||
func (mtm *mockTtMsgStream) GetProduceChannels() []string {
|
||||
return make([]string, 0)
|
||||
|
@ -85,14 +81,8 @@ func (mtm *mockTtMsgStream) GetProduceChannels() []string {
|
|||
func (mtm *mockTtMsgStream) Produce(*msgstream.MsgPack) error {
|
||||
return nil
|
||||
}
|
||||
func (mtm *mockTtMsgStream) ProduceMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return map[string][]msgstream.MessageID{}, nil
|
||||
}
|
||||
func (mtm *mockTtMsgStream) Broadcast(*msgstream.MsgPack) error {
|
||||
return nil
|
||||
}
|
||||
func (mtm *mockTtMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return map[string][]msgstream.MessageID{}, nil
|
||||
func (mtm *mockTtMsgStream) Broadcast(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error {
|
||||
return nil
|
||||
|
|
|
@ -615,7 +615,6 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
|
|||
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
|
||||
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick))
|
||||
var wTtMsgStream msgstream.MsgStream = wTt
|
||||
wTtMsgStream.Start()
|
||||
|
||||
mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error {
|
||||
stats := make([]*datapb.SegmentStats, 0, len(segmentIDs))
|
||||
|
|
|
@ -15,10 +15,6 @@ func (m MockMsgStream) AsProducer(channels []string) {
|
|||
m.AsProducerFunc(channels)
|
||||
}
|
||||
|
||||
func (m MockMsgStream) BroadcastMark(pack *MsgPack) (map[string][]MessageID, error) {
|
||||
func (m MockMsgStream) Broadcast(pack *MsgPack) (map[string][]MessageID, error) {
|
||||
return m.BroadcastMarkFunc(pack)
|
||||
}
|
||||
|
||||
func (m MockMsgStream) Broadcast(pack *MsgPack) error {
|
||||
return m.BroadcastFunc(pack)
|
||||
}
|
||||
|
|
|
@ -150,7 +150,6 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
|
|||
|
||||
err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
|
||||
assert.Nil(t, err)
|
||||
outputStream2.Start()
|
||||
|
||||
cnt := 0
|
||||
var value int64 = 6
|
||||
|
@ -223,21 +222,21 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) {
|
|||
inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels)
|
||||
outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack3)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack4)
|
||||
_, err = inputStream.Broadcast(&msgPack4)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack5)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack6)
|
||||
_, err = inputStream.Broadcast(&msgPack6)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack7)
|
||||
_, err = inputStream.Broadcast(&msgPack7)
|
||||
assert.Nil(t, err)
|
||||
|
||||
receivedMsg := consumer(ctx, outputStream)
|
||||
|
@ -413,7 +412,6 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
|
|||
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
|
||||
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
|
||||
outputStream.Start()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
@ -460,7 +458,6 @@ func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChann
|
|||
for _, opt := range opts {
|
||||
inputStream.SetRepackFunc(opt)
|
||||
}
|
||||
inputStream.Start()
|
||||
return inputStream
|
||||
}
|
||||
|
||||
|
@ -469,7 +466,6 @@ func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChan
|
|||
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, position)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
|
@ -478,7 +474,6 @@ func getKafkaTtOutputStream(ctx context.Context, kafkaAddress string, consumerCh
|
|||
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
|
||||
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
|
@ -492,6 +487,5 @@ func getKafkaTtOutputStreamAndSeek(ctx context.Context, kafkaAddress string, pos
|
|||
}
|
||||
outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
|
||||
outputStream.Seek(positions)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
|
|
@ -175,9 +175,6 @@ func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
|
|||
ms.repackFunc = repackFunc
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) Start() {
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) Close() {
|
||||
log.Info("start to close mq msg stream",
|
||||
zap.Int("producer num", len(ms.producers)),
|
||||
|
@ -205,7 +202,7 @@ func (ms *mqMsgStream) Close() {
|
|||
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
|
||||
func (ms *mqMsgStream) computeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
|
||||
if len(tsMsgs) <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -239,7 +236,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
|||
return errors.New("nil producer in msg stream")
|
||||
}
|
||||
tsMsgs := msgPack.Msgs
|
||||
reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
|
||||
reBucketValues := ms.computeProduceChannelIndexes(msgPack.Msgs)
|
||||
var result map[int32]*MsgPack
|
||||
var err error
|
||||
if ms.repackFunc != nil {
|
||||
|
@ -294,119 +291,9 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ProduceMark send msg pack to all producers and returns corresponding msg id
|
||||
// the returned message id serves as marking
|
||||
func (ms *mqMsgStream) ProduceMark(msgPack *MsgPack) (map[string][]MessageID, error) {
|
||||
ids := make(map[string][]MessageID)
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
return ids, errors.New("empty msgs")
|
||||
}
|
||||
if len(ms.producers) <= 0 {
|
||||
return ids, errors.New("nil producer in msg stream")
|
||||
}
|
||||
tsMsgs := msgPack.Msgs
|
||||
reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
|
||||
var result map[int32]*MsgPack
|
||||
var err error
|
||||
if ms.repackFunc != nil {
|
||||
result, err = ms.repackFunc(tsMsgs, reBucketValues)
|
||||
} else {
|
||||
msgType := (tsMsgs[0]).Type()
|
||||
switch msgType {
|
||||
case commonpb.MsgType_Insert:
|
||||
result, err = InsertRepackFunc(tsMsgs, reBucketValues)
|
||||
case commonpb.MsgType_Delete:
|
||||
result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
|
||||
default:
|
||||
result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return ids, err
|
||||
}
|
||||
for k, v := range result {
|
||||
channel := ms.producerChannels[k]
|
||||
for i, tsMsg := range v.Msgs {
|
||||
sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), tsMsg)
|
||||
|
||||
mb, err := tsMsg.Marshal(tsMsg)
|
||||
if err != nil {
|
||||
return ids, err
|
||||
}
|
||||
|
||||
m, err := convertToByteArray(mb)
|
||||
if err != nil {
|
||||
return ids, err
|
||||
}
|
||||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
id, err := ms.producers[channel].Send(
|
||||
spanCtx,
|
||||
msg,
|
||||
)
|
||||
if err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
trace.LogError(sp, err)
|
||||
sp.Finish()
|
||||
return ids, err
|
||||
}
|
||||
ids[channel] = append(ids[channel], id)
|
||||
sp.Finish()
|
||||
ms.producerLock.Unlock()
|
||||
}
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// Broadcast put msgPack to all producer in current msgstream
|
||||
// Broadcast put msgPack to all producer in current msgstream and returned message id
|
||||
// which ignores repackFunc logic
|
||||
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
log.Debug("Warning: Receive empty msgPack")
|
||||
return nil
|
||||
}
|
||||
for _, v := range msgPack.Msgs {
|
||||
sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
|
||||
|
||||
mb, err := v.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := convertToByteArray(mb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
|
||||
trace.InjectContextToMsgProperties(sp.Context(), msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
for _, producer := range ms.producers {
|
||||
if _, err := producer.Send(
|
||||
spanCtx,
|
||||
msg,
|
||||
); err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
trace.LogError(sp, err)
|
||||
sp.Finish()
|
||||
return err
|
||||
}
|
||||
}
|
||||
ms.producerLock.Unlock()
|
||||
sp.Finish()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastMark broadcast msg pack to all producers and returns corresponding msg id
|
||||
// the returned message id serves as marking
|
||||
func (ms *mqMsgStream) BroadcastMark(msgPack *MsgPack) (map[string][]MessageID, error) {
|
||||
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, error) {
|
||||
ids := make(map[string][]MessageID)
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
return ids, errors.New("empty msgs")
|
||||
|
@ -653,9 +540,6 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, position
|
|||
}
|
||||
}
|
||||
|
||||
// Start will start a goroutine which keep carrying msg from pulsar/rocksmq to golang chan
|
||||
func (ms *MqTtMsgStream) Start() {}
|
||||
|
||||
// Close will stop goroutine and free internal producers and consumers
|
||||
func (ms *MqTtMsgStream) Close() {
|
||||
close(ms.syncConsumer)
|
||||
|
|
|
@ -202,7 +202,7 @@ func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
// empty parameters
|
||||
reBucketValues := m.ComputeProduceChannelIndexes([]TsMsg{})
|
||||
reBucketValues := m.computeProduceChannelIndexes([]TsMsg{})
|
||||
assert.Nil(t, reBucketValues)
|
||||
|
||||
// not called AsProducer yet
|
||||
|
@ -228,7 +228,7 @@ func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) {
|
|||
RowData: []*commonpb.Blob{},
|
||||
},
|
||||
}
|
||||
reBucketValues = m.ComputeProduceChannelIndexes([]TsMsg{insertMsg})
|
||||
reBucketValues = m.computeProduceChannelIndexes([]TsMsg{insertMsg})
|
||||
assert.Nil(t, reBucketValues)
|
||||
}(parameters[i].client)
|
||||
}
|
||||
|
@ -312,8 +312,8 @@ func TestMqMsgStream_Broadcast(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
// Broadcast nil pointer
|
||||
err = m.Broadcast(nil)
|
||||
assert.Nil(t, err)
|
||||
_, err = m.Broadcast(nil)
|
||||
assert.NotNil(t, err)
|
||||
}(parameters[i].client)
|
||||
}
|
||||
}
|
||||
|
@ -481,7 +481,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
|
|||
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack)
|
||||
_, err := inputStream.Broadcast(&msgPack)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
receiveMsg(ctx, outputStream, len(consumerChannels)*len(msgPack.Msgs))
|
||||
|
@ -552,12 +552,10 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
inputStream.Start()
|
||||
|
||||
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
err := (*inputStream).Produce(&msgPack)
|
||||
|
@ -607,12 +605,10 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
inputStream.Start()
|
||||
|
||||
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
err := (*inputStream).Produce(&msgPack)
|
||||
|
@ -641,12 +637,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
inputStream.Start()
|
||||
|
||||
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
err := (*inputStream).Produce(&msgPack)
|
||||
|
@ -677,13 +671,13 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
|
|||
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
|
||||
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
receiveMsg(ctx, outputStream, len(msgPack1.Msgs))
|
||||
|
@ -722,17 +716,17 @@ func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) {
|
|||
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack3)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack4)
|
||||
_, err = inputStream.Broadcast(&msgPack4)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack5)
|
||||
_, err = inputStream.Broadcast(&msgPack5)
|
||||
assert.Nil(t, err)
|
||||
|
||||
o1 := consumer(ctx, outputStream)
|
||||
|
@ -802,7 +796,6 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) {
|
|||
|
||||
err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
|
||||
assert.Nil(t, err)
|
||||
outputStream2.Start()
|
||||
|
||||
cnt := 0
|
||||
var value int64 = 6
|
||||
|
@ -874,21 +867,21 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
|
|||
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack3)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack4)
|
||||
_, err = inputStream.Broadcast(&msgPack4)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack5)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack6)
|
||||
_, err = inputStream.Broadcast(&msgPack6)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack7)
|
||||
_, err = inputStream.Broadcast(&msgPack7)
|
||||
assert.Nil(t, err)
|
||||
|
||||
receivedMsg := consumer(ctx, outputStream)
|
||||
|
@ -968,13 +961,13 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
|
|||
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
|
||||
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
receiveMsg(ctx, outputStream, len(msgPack1.Msgs))
|
||||
|
@ -1025,7 +1018,7 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error {
|
|||
}
|
||||
} else {
|
||||
// tt msg use Broadcast
|
||||
if err := ms.Broadcast(msgPacks[i]); err != nil {
|
||||
if _, err := ms.Broadcast(msgPacks[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1033,23 +1026,22 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// This testcase will generate MsgPacks as following:
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// c1 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// c1 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// c2 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// c2 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
// Then check:
|
||||
// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be
|
||||
// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
//
|
||||
// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be
|
||||
// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
func TestStream_PulsarTtMsgStream_1(t *testing.T) {
|
||||
pulsarAddr := getPulsarAddress()
|
||||
c1 := funcutil.RandomString(8)
|
||||
|
@ -1097,22 +1089,25 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) {
|
|||
outputStream.Close()
|
||||
}
|
||||
|
||||
//
|
||||
// This testcase will generate MsgPacks as following:
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
//
|
||||
// c1 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
//
|
||||
// c2 |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
// Then check:
|
||||
// 1. ttMsgStream consumer can seek to the right position and resume
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
//
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Then check:
|
||||
// 1. ttMsgStream consumer can seek to the right position and resume
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
func TestStream_PulsarTtMsgStream_2(t *testing.T) {
|
||||
pulsarAddr := getPulsarAddress()
|
||||
c1 := funcutil.RandomString(8)
|
||||
|
@ -1203,7 +1198,6 @@ func TestStream_MqMsgStream_Seek(t *testing.T) {
|
|||
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
|
||||
outputStream2.Start()
|
||||
|
||||
for i := 6; i < 10; i++ {
|
||||
result := consumer(ctx, outputStream2)
|
||||
|
@ -1261,7 +1255,6 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) {
|
|||
|
||||
err = outputStream2.Seek(p)
|
||||
assert.Nil(t, err)
|
||||
outputStream2.Start()
|
||||
|
||||
for i := 10; i < 20; i++ {
|
||||
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
|
||||
|
@ -1318,7 +1311,6 @@ func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) {
|
|||
|
||||
err = outputStream2.Seek(p)
|
||||
assert.Nil(t, err)
|
||||
outputStream2.Start()
|
||||
|
||||
for i := 10; i < 20; i++ {
|
||||
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
|
||||
|
@ -1356,7 +1348,6 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
|
||||
outputStream2.Start()
|
||||
|
||||
msgPack.Msgs = nil
|
||||
// produce another 10 tsMs
|
||||
|
@ -1423,13 +1414,11 @@ func initRmqStream(ctx context.Context,
|
|||
for _, opt := range opts {
|
||||
inputStream.SetRepackFunc(opt)
|
||||
}
|
||||
inputStream.Start()
|
||||
var input MsgStream = inputStream
|
||||
|
||||
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
return input, output
|
||||
|
@ -1448,13 +1437,11 @@ func initRmqTtStream(ctx context.Context,
|
|||
for _, opt := range opts {
|
||||
inputStream.SetRepackFunc(opt)
|
||||
}
|
||||
inputStream.Start()
|
||||
var input MsgStream = inputStream
|
||||
|
||||
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
|
||||
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
return input, output
|
||||
|
@ -1500,13 +1487,13 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
|
||||
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
|
||||
|
||||
receiveMsg(ctx, outputStream, len(msgPack1.Msgs))
|
||||
|
@ -1541,13 +1528,13 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack2)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack3)
|
||||
_, err = inputStream.Broadcast(&msgPack3)
|
||||
assert.Nil(t, err)
|
||||
|
||||
receivedMsg := consumer(ctx, outputStream)
|
||||
|
@ -1564,7 +1551,6 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
|
|||
consumerSubName = funcutil.RandomString(8)
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
|
||||
outputStream.Seek(receivedMsg.StartPositions)
|
||||
outputStream.Start()
|
||||
seekMsg := consumer(ctx, outputStream)
|
||||
assert.Equal(t, len(seekMsg.Msgs), 1+2)
|
||||
assert.EqualValues(t, seekMsg.Msgs[0].BeginTs(), 1)
|
||||
|
@ -1614,21 +1600,21 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
_, err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack2)
|
||||
_, err = inputStream.Broadcast(&msgPack2)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack3)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack4)
|
||||
_, err = inputStream.Broadcast(&msgPack4)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack5)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack6)
|
||||
_, err = inputStream.Broadcast(&msgPack6)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Broadcast(&msgPack7)
|
||||
_, err = inputStream.Broadcast(&msgPack7)
|
||||
assert.Nil(t, err)
|
||||
|
||||
receivedMsg := consumer(ctx, outputStream)
|
||||
|
@ -1670,7 +1656,6 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
|
|||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
|
||||
|
||||
outputStream.Seek(receivedMsg3.StartPositions)
|
||||
outputStream.Start()
|
||||
seekMsg := consumer(ctx, outputStream)
|
||||
assert.Equal(t, len(seekMsg.Msgs), 3)
|
||||
result := []uint64{14, 12, 13}
|
||||
|
@ -1701,12 +1686,11 @@ func TestStream_BroadcastMark(t *testing.T) {
|
|||
|
||||
// add producer channels
|
||||
outputStream.AsProducer(producerChannels)
|
||||
outputStream.Start()
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
ids, err := outputStream.BroadcastMark(&msgPack0)
|
||||
ids, err := outputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, ids)
|
||||
assert.Equal(t, len(producerChannels), len(ids))
|
||||
|
@ -1720,7 +1704,7 @@ func TestStream_BroadcastMark(t *testing.T) {
|
|||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
ids, err = outputStream.BroadcastMark(&msgPack1)
|
||||
ids, err = outputStream.Broadcast(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, ids)
|
||||
assert.Equal(t, len(producerChannels), len(ids))
|
||||
|
@ -1731,86 +1715,19 @@ func TestStream_BroadcastMark(t *testing.T) {
|
|||
}
|
||||
|
||||
// edge cases
|
||||
_, err = outputStream.BroadcastMark(nil)
|
||||
_, err = outputStream.Broadcast(nil)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, &MarshalFailTsMsg{})
|
||||
_, err = outputStream.BroadcastMark(&msgPack2)
|
||||
_, err = outputStream.Broadcast(&msgPack2)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// mock send fail
|
||||
for k, p := range outputStream.producers {
|
||||
outputStream.producers[k] = &mockSendFailProducer{Producer: p}
|
||||
}
|
||||
_, err = outputStream.BroadcastMark(&msgPack1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
outputStream.Close()
|
||||
}
|
||||
|
||||
func TestStream_ProduceMark(t *testing.T) {
|
||||
pulsarAddress := getPulsarAddress()
|
||||
c1 := funcutil.RandomString(8)
|
||||
c2 := funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
assert.Nil(t, err)
|
||||
outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
assert.Nil(t, err)
|
||||
|
||||
// add producer channels
|
||||
outputStream.AsProducer(producerChannels)
|
||||
outputStream.Start()
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
ids, err := outputStream.ProduceMark(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, ids)
|
||||
assert.Equal(t, len(msgPack0.Msgs), len(ids))
|
||||
for _, c := range producerChannels {
|
||||
if id, ok := ids[c]; ok {
|
||||
assert.Equal(t, len(msgPack0.Msgs), len(id))
|
||||
}
|
||||
}
|
||||
|
||||
msgPack1 := MsgPack{}
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 2))
|
||||
|
||||
ids, err = outputStream.ProduceMark(&msgPack1)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, ids)
|
||||
assert.Equal(t, len(producerChannels), len(ids))
|
||||
for _, c := range producerChannels {
|
||||
ids, ok := ids[c]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, 1, len(ids))
|
||||
}
|
||||
|
||||
// edge cases
|
||||
_, err = outputStream.ProduceMark(nil)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, &MarshalFailTsMsg{BaseMsg: BaseMsg{HashValues: []uint32{1}}})
|
||||
_, err = outputStream.ProduceMark(&msgPack2)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// mock send fail
|
||||
for k, p := range outputStream.producers {
|
||||
outputStream.producers[k] = &mockSendFailProducer{Producer: p}
|
||||
}
|
||||
_, err = outputStream.ProduceMark(&msgPack1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// mock producers is nil
|
||||
outputStream.producers = nil
|
||||
_, err = outputStream.ProduceMark(&msgPack1)
|
||||
_, err = outputStream.Broadcast(&msgPack1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
outputStream.Close()
|
||||
|
@ -2054,7 +1971,6 @@ func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerCha
|
|||
for _, opt := range opts {
|
||||
inputStream.SetRepackFunc(opt)
|
||||
}
|
||||
inputStream.Start()
|
||||
return inputStream
|
||||
}
|
||||
|
||||
|
@ -2063,7 +1979,6 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
|
@ -2072,7 +1987,6 @@ func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumer
|
|||
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
|
@ -2086,7 +2000,6 @@ func getPulsarTtOutputStreamAndSeek(ctx context.Context, pulsarAddress string, p
|
|||
}
|
||||
outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
|
||||
outputStream.Seek(positions)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
|
@ -2140,12 +2053,10 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
|
|||
|
||||
otherInputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
|
||||
otherInputStream.AsProducer([]string{"root_timetick"})
|
||||
otherInputStream.Start()
|
||||
otherInputStream.Produce(getTimeTickMsgPack(999))
|
||||
|
||||
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
inputStream.Start()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
inputStream.Produce(getTimeTickMsgPack(int64(i)))
|
||||
|
@ -2154,7 +2065,6 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
|
|||
rmqClient2, _ := rmq.NewClientWithDefaultOptions()
|
||||
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
|
||||
outputStream.Start()
|
||||
|
||||
inputStream.Produce(getTimeTickMsgPack(1000))
|
||||
pack := <-outputStream.Chan()
|
||||
|
|
|
@ -53,17 +53,13 @@ type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro
|
|||
|
||||
// MsgStream is an interface that can be used to produce and consume message on message queue
|
||||
type MsgStream interface {
|
||||
Start()
|
||||
Close()
|
||||
|
||||
AsProducer(channels []string)
|
||||
Produce(*MsgPack) error
|
||||
SetRepackFunc(repackFunc RepackFunc)
|
||||
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
|
||||
GetProduceChannels() []string
|
||||
ProduceMark(*MsgPack) (map[string][]MessageID, error)
|
||||
Broadcast(*MsgPack) error
|
||||
BroadcastMark(*MsgPack) (map[string][]MessageID, error)
|
||||
Broadcast(*MsgPack) (map[string][]MessageID, error)
|
||||
|
||||
AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
|
||||
Chan() <-chan *MsgPack
|
||||
|
|
|
@ -236,9 +236,6 @@ type simpleMockMsgStream struct {
|
|||
msgCountMtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) Start() {
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) Close() {
|
||||
}
|
||||
|
||||
|
@ -259,26 +256,6 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) {
|
|||
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
|
||||
if len(tsMsgs) <= 0 {
|
||||
return nil
|
||||
}
|
||||
reBucketValues := make([][]int32, len(tsMsgs))
|
||||
channelNum := uint32(1)
|
||||
if channelNum == 0 {
|
||||
return nil
|
||||
}
|
||||
for idx, tsMsg := range tsMsgs {
|
||||
hashValues := tsMsg.HashKeys()
|
||||
bucketValues := make([]int32, len(hashValues))
|
||||
for index, hashValue := range hashValues {
|
||||
bucketValues[index] = int32(hashValue % channelNum)
|
||||
}
|
||||
reBucketValues[idx] = bucketValues
|
||||
}
|
||||
return reBucketValues
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {
|
||||
}
|
||||
|
||||
|
@ -308,18 +285,7 @@ func (ms *simpleMockMsgStream) Produce(pack *msgstream.MsgPack) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) ProduceMark(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
defer ms.increaseMsgCount(1)
|
||||
ms.msgChan <- pack
|
||||
|
||||
return map[string][]msgstream.MessageID{}, nil
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *simpleMockMsgStream) BroadcastMark(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return map[string][]msgstream.MessageID{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
pos1, err := stream.ProduceMark(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}})
|
||||
pos1, err := stream.Broadcast(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}})
|
||||
assert.NoError(t, err)
|
||||
msgIDs, ok := pos1[pDmChannel]
|
||||
assert.True(t, ok)
|
||||
|
|
|
@ -751,7 +751,6 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stream.Start()
|
||||
|
||||
delData := &deleteData{
|
||||
deleteIDs: make(map[UniqueID][]primaryKey),
|
||||
|
|
|
@ -934,8 +934,6 @@ func (ms *LoadDeleteMsgStream) GetLatestMsgID(channel string) (msgstream.Message
|
|||
return msg.(msgstream.MessageID), err.(error)
|
||||
}
|
||||
|
||||
func (ms *LoadDeleteMsgStream) Start() {}
|
||||
|
||||
type getCollectionByIDFunc func(collectionID UniqueID) (*Collection, error)
|
||||
|
||||
type mockReplicaInterface struct {
|
||||
|
|
|
@ -231,7 +231,7 @@ func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) err
|
|||
|
||||
dms.mutex.RLock()
|
||||
if dms.refcnt > 0 {
|
||||
if err := dms.ms.Broadcast(pack); err != nil {
|
||||
if _, err := dms.ms.Broadcast(pack); err != nil {
|
||||
log.Error("Broadcast failed", zap.Error(err), zap.String("chanName", chanName))
|
||||
dms.mutex.RUnlock()
|
||||
return err
|
||||
|
@ -254,7 +254,7 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack)
|
|||
|
||||
dms.mutex.RLock()
|
||||
if dms.refcnt > 0 {
|
||||
ids, err := dms.ms.BroadcastMark(pack)
|
||||
ids, err := dms.ms.Broadcast(pack)
|
||||
if err != nil {
|
||||
log.Error("BroadcastMark failed", zap.Error(err), zap.String("chanName", chanName))
|
||||
dms.mutex.RUnlock()
|
||||
|
|
|
@ -209,29 +209,18 @@ type FailMsgStream struct {
|
|||
errBroadcast bool
|
||||
}
|
||||
|
||||
func (ms *FailMsgStream) Start() {}
|
||||
func (ms *FailMsgStream) Close() {}
|
||||
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
|
||||
func (ms *FailMsgStream) AsProducer(channels []string) {}
|
||||
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
|
||||
func (ms *FailMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
|
||||
}
|
||||
func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
|
||||
func (ms *FailMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { return nil }
|
||||
func (ms *FailMsgStream) GetProduceChannels() []string { return nil }
|
||||
func (ms *FailMsgStream) Produce(*msgstream.MsgPack) error { return nil }
|
||||
func (ms *FailMsgStream) ProduceMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (ms *FailMsgStream) Broadcast(*msgstream.MsgPack) error {
|
||||
func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
|
||||
func (ms *FailMsgStream) GetProduceChannels() []string { return nil }
|
||||
func (ms *FailMsgStream) Produce(*msgstream.MsgPack) error { return nil }
|
||||
func (ms *FailMsgStream) Broadcast(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
if ms.errBroadcast {
|
||||
return errors.New("broadcast error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (ms *FailMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
if ms.errBroadcast {
|
||||
return nil, errors.New("broadcastMark error")
|
||||
return nil, errors.New("broadcast error")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ func (inNode *InputNode) IsInputNode() bool {
|
|||
|
||||
// Start is used to start input msgstream
|
||||
func (inNode *InputNode) Start() {
|
||||
inNode.inStream.Start()
|
||||
}
|
||||
|
||||
// Close implements node
|
||||
|
|
|
@ -33,7 +33,6 @@ func TestInputNode(t *testing.T) {
|
|||
msgStream, _ := factory.NewMsgStream(context.TODO())
|
||||
channels := []string{"cc"}
|
||||
msgStream.AsConsumer(channels, "sub", mqwrapper.SubscriptionPositionEarliest)
|
||||
msgStream.Start()
|
||||
|
||||
msgPack := generateMsgPack()
|
||||
produceStream, _ := factory.NewMsgStream(context.TODO())
|
||||
|
|
Loading…
Reference in New Issue