mirror of https://github.com/milvus-io/milvus.git
parent
9aa1fd25fa
commit
3630eec92c
|
@ -102,8 +102,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) AsConsumer(channels []string,
|
||||
subName string) {
|
||||
func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
|
||||
for _, channel := range channels {
|
||||
if _, ok := ms.consumers[channel]; ok {
|
||||
continue
|
||||
|
@ -190,14 +189,14 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
|
|||
}
|
||||
|
||||
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
||||
tsMsgs := msgPack.Msgs
|
||||
if len(tsMsgs) <= 0 {
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
log.Debug("Warning: Receive empty msgPack")
|
||||
return nil
|
||||
}
|
||||
if len(ms.producers) <= 0 {
|
||||
return errors.New("nil producer in msg stream")
|
||||
}
|
||||
tsMsgs := msgPack.Msgs
|
||||
reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
|
||||
var result map[int32]*MsgPack
|
||||
var err error
|
||||
|
@ -251,6 +250,10 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
|||
}
|
||||
|
||||
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
log.Debug("Warning: Receive empty msgPack")
|
||||
return nil
|
||||
}
|
||||
for _, v := range msgPack.Msgs {
|
||||
sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
|
||||
|
||||
|
@ -479,7 +482,6 @@ func (ms *MqTtMsgStream) Close() {
|
|||
|
||||
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
||||
defer ms.wait.Done()
|
||||
ms.unsolvedBuf = make(map[mqclient.Consumer][]TsMsg)
|
||||
isChannelReady := make(map[mqclient.Consumer]bool)
|
||||
eofMsgTimeStamp := make(map[mqclient.Consumer]Timestamp)
|
||||
|
||||
|
@ -503,11 +505,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
|||
wg.Add(1)
|
||||
go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex)
|
||||
}
|
||||
ms.consumerLock.Unlock()
|
||||
wg.Wait()
|
||||
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
|
||||
if !ok || timeStamp <= ms.lastTimeStamp {
|
||||
//log.Printf("All timeTick's timestamps are inconsistent")
|
||||
ms.consumerLock.Unlock()
|
||||
continue
|
||||
}
|
||||
timeTickBuf := make([]TsMsg, 0)
|
||||
|
@ -553,6 +555,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
|
|||
ms.msgPositions[consumer] = newPos
|
||||
}
|
||||
ms.unsolvedMutex.Unlock()
|
||||
ms.consumerLock.Unlock()
|
||||
|
||||
msgPack := MsgPack{
|
||||
BeginTs: ms.lastTimeStamp,
|
||||
|
@ -712,9 +715,9 @@ func (ms *MqTtMsgStream) Seek(mp *internalpb.MsgPosition) error {
|
|||
ms.addConsumer(consumer, seekChannel)
|
||||
|
||||
//TODO: May cause problem
|
||||
if len(consumer.Chan()) == 0 {
|
||||
return nil
|
||||
}
|
||||
//if len(consumer.Chan()) == 0 {
|
||||
// return nil
|
||||
//}
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -13,10 +13,11 @@ package msgstream
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -57,7 +58,9 @@ func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
|
||||
func getTsMsg(msgType MsgType, reqID UniqueID) TsMsg {
|
||||
hashValue := uint32(reqID)
|
||||
time := uint64(reqID)
|
||||
baseMsg := BaseMsg{
|
||||
BeginTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
|
@ -69,14 +72,14 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
|
|||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
MsgID: reqID,
|
||||
Timestamp: 11,
|
||||
Timestamp: time,
|
||||
SourceID: reqID,
|
||||
},
|
||||
CollectionName: "Collection",
|
||||
PartitionName: "Partition",
|
||||
SegmentID: 1,
|
||||
ChannelID: "0",
|
||||
Timestamps: []Timestamp{uint64(reqID)},
|
||||
Timestamps: []Timestamp{time},
|
||||
RowIDs: []int64{1},
|
||||
RowData: []*commonpb.Blob{{}},
|
||||
}
|
||||
|
@ -164,7 +167,9 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
|
||||
func getTimeTickMsg(reqID UniqueID) TsMsg {
|
||||
hashValue := uint32(reqID)
|
||||
time := uint64(reqID)
|
||||
baseMsg := BaseMsg{
|
||||
BeginTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
|
@ -185,41 +190,30 @@ func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
|
|||
return timeTickMsg
|
||||
}
|
||||
|
||||
func initPulsarStream(pulsarAddress string,
|
||||
producerChannels []string,
|
||||
consumerChannels []string,
|
||||
consumerSubName string,
|
||||
opts ...RepackFunc) (MsgStream, MsgStream) {
|
||||
factory := ProtoUDFactory{}
|
||||
|
||||
// set input stream
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
for _, opt := range opts {
|
||||
inputStream.SetRepackFunc(opt)
|
||||
// Generate MsgPack contains 'num' msgs, with timestamp in (start, end)
|
||||
func getInsertMsgPack(num int, start int, end int) *MsgPack {
|
||||
Rand := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
set := make(map[int]bool)
|
||||
msgPack := MsgPack{}
|
||||
for len(set) < num {
|
||||
reqID := Rand.Int()%(end-start-1) + start + 1
|
||||
_, ok := set[reqID]
|
||||
if !ok {
|
||||
set[reqID] = true
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(reqID)))
|
||||
}
|
||||
}
|
||||
inputStream.Start()
|
||||
var input MsgStream = inputStream
|
||||
|
||||
// set output stream
|
||||
pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
|
||||
return input, output
|
||||
return &msgPack
|
||||
}
|
||||
|
||||
func initPulsarTtStream(pulsarAddress string,
|
||||
producerChannels []string,
|
||||
consumerChannels []string,
|
||||
consumerSubName string,
|
||||
opts ...RepackFunc) (MsgStream, MsgStream) {
|
||||
factory := ProtoUDFactory{}
|
||||
func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID))
|
||||
return &msgPack
|
||||
}
|
||||
|
||||
// set input stream
|
||||
func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
inputStream.AsProducer(producerChannels)
|
||||
|
@ -227,16 +221,39 @@ func initPulsarTtStream(pulsarAddress string,
|
|||
inputStream.SetRepackFunc(opt)
|
||||
}
|
||||
inputStream.Start()
|
||||
var input MsgStream = inputStream
|
||||
return inputStream
|
||||
}
|
||||
|
||||
// set output stream
|
||||
pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
|
||||
func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName)
|
||||
outputStream.Start()
|
||||
var output MsgStream = outputStream
|
||||
return outputStream
|
||||
}
|
||||
|
||||
return input, output
|
||||
func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
outputStream.AsConsumer(consumerChannels, consumerSubName)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPosition) MsgStream {
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
//outputStream.AsConsumer(consumerChannels, consumerSubName)
|
||||
outputStream.Start()
|
||||
for _, pos := range positions {
|
||||
pos.MsgGroup = funcutil.RandomString(4)
|
||||
outputStream.Seek(pos)
|
||||
}
|
||||
//outputStream.Start()
|
||||
return outputStream
|
||||
}
|
||||
|
||||
func receiveMsg(outputStream MsgStream, msgCount int) {
|
||||
|
@ -247,8 +264,9 @@ func receiveMsg(outputStream MsgStream, msgCount int) {
|
|||
msgs := result.Msgs
|
||||
for _, v := range msgs {
|
||||
receiveCount++
|
||||
fmt.Println("msg type: ", v.Type(), ", msg value: ", v)
|
||||
log.Println("msg type: ", v.Type(), ", msg value: ", v)
|
||||
}
|
||||
log.Println("================")
|
||||
}
|
||||
if receiveCount >= msgCount {
|
||||
break
|
||||
|
@ -256,6 +274,17 @@ func receiveMsg(outputStream MsgStream, msgCount int) {
|
|||
}
|
||||
}
|
||||
|
||||
func printMsgPack(msgPack *MsgPack) {
|
||||
if msgPack == nil {
|
||||
log.Println("msg nil")
|
||||
} else {
|
||||
for _, v := range msgPack.Msgs {
|
||||
log.Println("msg type: ", v.Type(), ", msg value: ", v)
|
||||
}
|
||||
}
|
||||
log.Println("================")
|
||||
}
|
||||
|
||||
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
|
@ -264,10 +293,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -276,7 +307,6 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
|
|||
receiveMsg(outputStream, len(msgPack.Msgs))
|
||||
inputStream.Close()
|
||||
outputStream.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
|
||||
|
@ -286,10 +316,12 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
|
|||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 1))
|
||||
//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 3, 3))
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -307,10 +339,12 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 3))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -327,10 +361,12 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
|
|||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -347,10 +383,12 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
|
|||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -368,10 +406,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Broadcast(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -389,10 +429,11 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels, repackFunc)
|
||||
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
err := inputStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("produce error = %v", err)
|
||||
|
@ -521,10 +562,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 2, 2))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4, 4))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 2))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4))
|
||||
|
||||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
|
@ -554,16 +595,18 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
|
|||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
msgPack1 := MsgPack{}
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
if err != nil {
|
||||
log.Fatalf("broadcast error = %v", err)
|
||||
|
@ -589,26 +632,28 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
msgPack1 := MsgPack{}
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19, 19))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19))
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
|
||||
|
||||
msgPack3 := MsgPack{}
|
||||
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14, 14))
|
||||
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9, 9))
|
||||
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14))
|
||||
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9))
|
||||
|
||||
msgPack4 := MsgPack{}
|
||||
msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11))
|
||||
msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11))
|
||||
|
||||
msgPack5 := MsgPack{}
|
||||
msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15))
|
||||
msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
assert.Nil(t, err)
|
||||
err = inputStream.Produce(&msgPack1)
|
||||
|
@ -622,15 +667,15 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
|
|||
|
||||
outputStream.Consume()
|
||||
receivedMsg := outputStream.Consume()
|
||||
for _, position := range receivedMsg.StartPositions {
|
||||
outputStream.Seek(position)
|
||||
}
|
||||
outputStream.Close()
|
||||
outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddress, receivedMsg.EndPositions)
|
||||
|
||||
err = inputStream.Broadcast(&msgPack5)
|
||||
assert.Nil(t, err)
|
||||
//seekMsg, _ := outputStream.Consume()
|
||||
//for _, msg := range seekMsg.Msgs {
|
||||
// assert.Equal(t, msg.BeginTs(), uint64(14))
|
||||
//}
|
||||
seekMsg := outputStream.Consume()
|
||||
for _, msg := range seekMsg.Msgs {
|
||||
assert.Equal(t, msg.BeginTs(), uint64(14))
|
||||
}
|
||||
inputStream.Close()
|
||||
outputStream.Close()
|
||||
}
|
||||
|
@ -643,16 +688,18 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
|
|||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
msgPack1 := MsgPack{}
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
|
||||
err := inputStream.Broadcast(&msgPack0)
|
||||
if err != nil {
|
||||
log.Fatalf("broadcast error = %v", err)
|
||||
|
@ -670,6 +717,170 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
|
|||
outputStream.Close()
|
||||
}
|
||||
|
||||
//
|
||||
// This testcase will generate MsgPacks as following:
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Then check:
|
||||
// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be
|
||||
// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
//
|
||||
func TestStream_PulsarTtMsgStream_1(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
const msgsInPack = 5
|
||||
const numOfMsgPack = 10
|
||||
msgPacks := make([]*MsgPack, numOfMsgPack)
|
||||
|
||||
// generate MsgPack
|
||||
for i := 0; i < numOfMsgPack; i++ {
|
||||
if i%2 == 0 {
|
||||
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22)
|
||||
} else {
|
||||
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10))
|
||||
}
|
||||
}
|
||||
msgPacks = append(msgPacks, nil)
|
||||
msgPacks = append(msgPacks, getTimeTickMsgPack(100))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
|
||||
// produce msg
|
||||
log.Println("==============produce msg==================")
|
||||
for i := 0; i < len(msgPacks); i++ {
|
||||
printMsgPack(msgPacks[i])
|
||||
if i%2 == 0 {
|
||||
// insert msg use Produce
|
||||
err := inputStream.Produce(msgPacks[i])
|
||||
assert.Nil(t, err)
|
||||
} else {
|
||||
// tt msg use Broadcast
|
||||
err := inputStream.Broadcast(msgPacks[i])
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// consume msg
|
||||
log.Println("===============receive msg=================")
|
||||
checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int {
|
||||
rcvMsg := 0
|
||||
for i := 0; i < num; i++ {
|
||||
msgPack := outputStream.Consume()
|
||||
rcvMsg += len(msgPack.Msgs)
|
||||
if len(msgPack.Msgs) > 0 {
|
||||
for _, msg := range msgPack.Msgs {
|
||||
log.Println("msg type: ", msg.Type(), ", msg value: ", msg)
|
||||
assert.Greater(t, msg.BeginTs(), msgPack.BeginTs)
|
||||
assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs)
|
||||
}
|
||||
log.Println("================")
|
||||
}
|
||||
}
|
||||
return rcvMsg
|
||||
}
|
||||
msgCount := checkNMsgPack(t, outputStream, len(msgPacks)/2)
|
||||
assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount)
|
||||
|
||||
inputStream.Close()
|
||||
outputStream.Close()
|
||||
}
|
||||
|
||||
//
|
||||
// This testcase will generate MsgPacks as following:
|
||||
//
|
||||
// Insert Insert Insert Insert Insert Insert
|
||||
// |----------|----------|----------|----------|----------|----------|
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
|
||||
//
|
||||
// Then check:
|
||||
// 1. ttMsgStream consumer can seek to the right position and resume
|
||||
// 2. The count of consumed msg should be equal to the count of produced msg
|
||||
//
|
||||
func TestStream_PulsarTtMsgStream_2(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
const msgsInPack = 5
|
||||
const numOfMsgPack = 10
|
||||
msgPacks := make([]*MsgPack, numOfMsgPack)
|
||||
|
||||
// generate MsgPack
|
||||
for i := 0; i < numOfMsgPack; i++ {
|
||||
if i%2 == 0 {
|
||||
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22)
|
||||
} else {
|
||||
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10))
|
||||
}
|
||||
}
|
||||
msgPacks = append(msgPacks, nil)
|
||||
msgPacks = append(msgPacks, getTimeTickMsgPack(100))
|
||||
|
||||
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
|
||||
|
||||
// produce msg
|
||||
log.Println("===============produce msg=================")
|
||||
for i := 0; i < len(msgPacks); i++ {
|
||||
printMsgPack(msgPacks[i])
|
||||
if i%2 == 0 {
|
||||
// insert msg use Produce
|
||||
err := inputStream.Produce(msgPacks[i])
|
||||
assert.Nil(t, err)
|
||||
} else {
|
||||
// tt msg use Broadcast
|
||||
err := inputStream.Broadcast(msgPacks[i])
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// consume msg
|
||||
log.Println("=============receive msg===================")
|
||||
rcvMsgPacks := make([]*MsgPack, 0)
|
||||
|
||||
resumeMsgPack := func(t *testing.T) int {
|
||||
var outputStream MsgStream
|
||||
msgCount := len(rcvMsgPacks)
|
||||
if msgCount == 0 {
|
||||
outputStream = getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
|
||||
} else {
|
||||
outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddress, rcvMsgPacks[msgCount-1].EndPositions)
|
||||
}
|
||||
msgPack := outputStream.Consume()
|
||||
rcvMsgPacks = append(rcvMsgPacks, msgPack)
|
||||
if len(msgPack.Msgs) > 0 {
|
||||
for _, msg := range msgPack.Msgs {
|
||||
log.Println("msg type: ", msg.Type(), ", msg value: ", msg)
|
||||
assert.Greater(t, msg.BeginTs(), msgPack.BeginTs)
|
||||
assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs)
|
||||
}
|
||||
log.Println("================")
|
||||
}
|
||||
outputStream.Close()
|
||||
return len(rcvMsgPacks[msgCount].Msgs)
|
||||
}
|
||||
|
||||
msgCount := 0
|
||||
for i := 0; i < len(msgPacks)/2; i++ {
|
||||
msgCount += resumeMsgPack(t)
|
||||
}
|
||||
assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount)
|
||||
|
||||
inputStream.Close()
|
||||
}
|
||||
|
||||
/****************************************Rmq test******************************************/
|
||||
|
||||
func initRmq(name string) *etcdkv.EtcdKV {
|
||||
|
@ -698,7 +909,7 @@ func Close(rocksdbName string, intputStream, outputStream MsgStream, etcdKV *etc
|
|||
outputStream.Close()
|
||||
etcdKV.Close()
|
||||
err := os.RemoveAll(rocksdbName)
|
||||
fmt.Println(err)
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
func initRmqStream(producerChannels []string,
|
||||
|
@ -755,8 +966,8 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) {
|
|||
consumerGroupName := "InsertGroup"
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
rocksdbName := "/tmp/rocksmq_insert"
|
||||
etcdKV := initRmq(rocksdbName)
|
||||
|
@ -776,14 +987,14 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
|
|||
consumerSubName := "subInsert"
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
|
||||
|
||||
msgPack1 := MsgPack{}
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
|
||||
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
|
||||
|
||||
msgPack2 := MsgPack{}
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
|
||||
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
|
||||
|
||||
rocksdbName := "/tmp/rocksmq_insert_tt"
|
||||
etcdKV := initRmq(rocksdbName)
|
||||
|
|
Loading…
Reference in New Issue