Optimize MqTtMsgStream and unittest (#5498)

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add getTsMsgFromConsumerMsg

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename some variables for better readability

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add chanWaitGroup

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add chanTtMsgTime

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename lastTimeStamp to currTimeStamp

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add comments

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* support unsynced ttMsg

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update testcase

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update testcase

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update testcase

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add TestStream_PulsarTtMsgStream_3

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* remove debug log

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* change ttMsg logic back

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix ci block issue

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update testcase

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix codacy

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5501/head
Cai Yudong 2021-05-31 11:31:30 +08:00 committed by GitHub
parent 4af2cacb28
commit c468481a36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 206 additions and 196 deletions

View File

@ -124,8 +124,10 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
return errors.New("Consumer is nil") return errors.New("Consumer is nil")
} }
ms.consumerLock.Lock()
ms.consumers[channel] = pc ms.consumers[channel] = pc
ms.consumerChannels = append(ms.consumerChannels, channel) ms.consumerChannels = append(ms.consumerChannels, channel)
ms.consumerLock.Unlock()
ms.wait.Add(1) ms.wait.Add(1)
go ms.receiveMsg(pc) go ms.receiveMsg(pc)
return nil return nil
@ -292,19 +294,39 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
func (ms *mqMsgStream) Consume() *MsgPack { func (ms *mqMsgStream) Consume() *MsgPack {
for { for {
select { select {
case <-ms.ctx.Done():
//log.Debug("context closed")
return nil
case cm, ok := <-ms.receiveBuf: case cm, ok := <-ms.receiveBuf:
if !ok { if !ok {
log.Debug("buf chan closed") log.Debug("buf chan closed")
return nil return nil
} }
return cm return cm
case <-ms.ctx.Done():
//log.Debug("context closed")
return nil
} }
} }
} }
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg mqclient.ConsumerMessage) (TsMsg, error) {
header := commonpb.MsgHeader{}
err := proto.Unmarshal(msg.Payload(), &header)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal message header, err %s", err.Error())
}
tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), header.Base.MsgType)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal tsMsg, err %s", err.Error())
}
// set msg info to tsMsg
tsMsg.SetPosition(&MsgPosition{
ChannelName: filepath.Base(msg.Topic()),
MsgID: msg.ID().Serialize(),
})
return tsMsg, nil
}
func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) { func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
defer ms.wait.Done() defer ms.wait.Done()
@ -317,15 +339,10 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
return return
} }
consumer.Ack(msg) consumer.Ack(msg)
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(msg.Payload(), &headerMsg) tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
if err != nil { if err != nil {
log.Error("Failed to unmarshal message header", zap.Error(err)) log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
continue
}
tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
if err != nil {
log.Error("Failed to unmarshal tsMsg", zap.Error(err))
continue continue
} }
@ -334,12 +351,6 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
} }
tsMsg.SetPosition(&MsgPosition{
ChannelName: filepath.Base(msg.Topic()),
//FIXME
MsgID: msg.ID().Serialize(),
})
msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
ms.receiveBuf <- &msgPack ms.receiveBuf <- &msgPack
@ -372,12 +383,15 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
type MqTtMsgStream struct { type MqTtMsgStream struct {
mqMsgStream mqMsgStream
unsolvedBuf map[mqclient.Consumer][]TsMsg chanMsgBuf map[mqclient.Consumer][]TsMsg
msgPositions map[mqclient.Consumer]*internalpb.MsgPosition chanMsgPos map[mqclient.Consumer]*internalpb.MsgPosition
unsolvedMutex *sync.Mutex chanStopChan map[mqclient.Consumer]chan bool
lastTimeStamp Timestamp chanTtMsgTime map[mqclient.Consumer]Timestamp
syncConsumer chan int chanMsgBufMutex *sync.Mutex
stopConsumeChan map[mqclient.Consumer]chan bool chanTtMsgTimeMutex *sync.RWMutex
chanWaitGroup *sync.WaitGroup
lastTimeStamp Timestamp
syncConsumer chan int
} }
func NewMqTtMsgStream(ctx context.Context, func NewMqTtMsgStream(ctx context.Context,
@ -389,18 +403,22 @@ func NewMqTtMsgStream(ctx context.Context,
if err != nil { if err != nil {
return nil, err return nil, err
} }
unsolvedBuf := make(map[mqclient.Consumer][]TsMsg) chanMsgBuf := make(map[mqclient.Consumer][]TsMsg)
stopChannel := make(map[mqclient.Consumer]chan bool) chanMsgPos := make(map[mqclient.Consumer]*internalpb.MsgPosition)
msgPositions := make(map[mqclient.Consumer]*internalpb.MsgPosition) chanStopChan := make(map[mqclient.Consumer]chan bool)
chanTtMsgTime := make(map[mqclient.Consumer]Timestamp)
syncConsumer := make(chan int, 1) syncConsumer := make(chan int, 1)
return &MqTtMsgStream{ return &MqTtMsgStream{
mqMsgStream: *msgStream, mqMsgStream: *msgStream,
unsolvedBuf: unsolvedBuf, chanMsgBuf: chanMsgBuf,
msgPositions: msgPositions, chanMsgPos: chanMsgPos,
unsolvedMutex: &sync.Mutex{}, chanStopChan: chanStopChan,
syncConsumer: syncConsumer, chanTtMsgTime: chanTtMsgTime,
stopConsumeChan: stopChannel, chanMsgBufMutex: &sync.Mutex{},
chanTtMsgTimeMutex: &sync.RWMutex{},
chanWaitGroup: &sync.WaitGroup{},
syncConsumer: syncConsumer,
}, nil }, nil
} }
@ -409,19 +427,19 @@ func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string)
ms.syncConsumer <- 1 ms.syncConsumer <- 1
} }
ms.consumers[channel] = consumer ms.consumers[channel] = consumer
ms.unsolvedBuf[consumer] = make([]TsMsg, 0)
ms.consumerChannels = append(ms.consumerChannels, channel) ms.consumerChannels = append(ms.consumerChannels, channel)
ms.msgPositions[consumer] = &internalpb.MsgPosition{ ms.chanMsgBuf[consumer] = make([]TsMsg, 0)
ms.chanMsgPos[consumer] = &internalpb.MsgPosition{
ChannelName: channel, ChannelName: channel,
MsgID: make([]byte, 0), MsgID: make([]byte, 0),
Timestamp: ms.lastTimeStamp, Timestamp: ms.lastTimeStamp,
} }
stopConsumeChan := make(chan bool) ms.chanStopChan[consumer] = make(chan bool)
ms.stopConsumeChan[consumer] = stopConsumeChan ms.chanTtMsgTime[consumer] = 0
} }
func (ms *MqTtMsgStream) AsConsumer(channels []string, // AsConsumer subscribes channels as consumer for a MsgStream
subName string) { func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
for _, channel := range channels { for _, channel := range channels {
if _, ok := ms.consumers[channel]; ok { if _, ok := ms.consumers[channel]; ok {
continue continue
@ -484,9 +502,9 @@ func (ms *MqTtMsgStream) Close() {
func (ms *MqTtMsgStream) bufMsgPackToChannel() { func (ms *MqTtMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done() defer ms.wait.Done()
isChannelReady := make(map[mqclient.Consumer]bool) chanTtMsgSync := make(map[mqclient.Consumer]bool)
eofMsgTimeStamp := make(map[mqclient.Consumer]Timestamp)
// block here until addConsumer
if _, ok := <-ms.syncConsumer; !ok { if _, ok := <-ms.syncConsumer; !ok {
log.Debug("consumer closed!") log.Debug("consumer closed!")
return return
@ -497,28 +515,30 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
case <-ms.ctx.Done(): case <-ms.ctx.Done():
return return
default: default:
wg := sync.WaitGroup{}
findMapMutex := sync.RWMutex{}
ms.consumerLock.Lock() ms.consumerLock.Lock()
// wait all channels get ttMsg
for _, consumer := range ms.consumers { for _, consumer := range ms.consumers {
if isChannelReady[consumer] { if !chanTtMsgSync[consumer] {
continue ms.chanWaitGroup.Add(1)
go ms.consumeToTtMsg(consumer)
} }
wg.Add(1)
go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex)
} }
wg.Wait() ms.chanWaitGroup.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
if !ok || timeStamp <= ms.lastTimeStamp { // block here until all channels reach same timetick
currTs, ok := ms.allChanReachSameTtMsg(chanTtMsgSync)
if !ok || currTs <= ms.lastTimeStamp {
//log.Printf("All timeTick's timestamps are inconsistent") //log.Printf("All timeTick's timestamps are inconsistent")
ms.consumerLock.Unlock() ms.consumerLock.Unlock()
continue continue
} }
timeTickBuf := make([]TsMsg, 0) timeTickBuf := make([]TsMsg, 0)
startMsgPosition := make([]*internalpb.MsgPosition, 0) startMsgPosition := make([]*internalpb.MsgPosition, 0)
endMsgPositions := make([]*internalpb.MsgPosition, 0) endMsgPositions := make([]*internalpb.MsgPosition, 0)
ms.unsolvedMutex.Lock() ms.chanMsgBufMutex.Lock()
for consumer, msgs := range ms.unsolvedBuf { for consumer, msgs := range ms.chanMsgBuf {
if len(msgs) == 0 { if len(msgs) == 0 {
continue continue
} }
@ -529,61 +549,65 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
timeTickMsg = v timeTickMsg = v
continue continue
} }
if v.EndTs() <= timeStamp { if v.EndTs() <= currTs {
timeTickBuf = append(timeTickBuf, v) timeTickBuf = append(timeTickBuf, v)
//log.Debug("pack msg", zap.Uint64("curr", v.EndTs()), zap.Uint64("currTs", currTs))
} else { } else {
tempBuffer = append(tempBuffer, v) tempBuffer = append(tempBuffer, v)
} }
} }
ms.unsolvedBuf[consumer] = tempBuffer ms.chanMsgBuf[consumer] = tempBuffer
startMsgPosition = append(startMsgPosition, ms.msgPositions[consumer]) startMsgPosition = append(startMsgPosition, ms.chanMsgPos[consumer])
var newPos *internalpb.MsgPosition var newPos *internalpb.MsgPosition
if len(tempBuffer) > 0 { if len(tempBuffer) > 0 {
// if tempBuffer is not empty, use tempBuffer[0] to seek
newPos = &internalpb.MsgPosition{ newPos = &internalpb.MsgPosition{
ChannelName: tempBuffer[0].Position().ChannelName, ChannelName: tempBuffer[0].Position().ChannelName,
MsgID: tempBuffer[0].Position().MsgID, MsgID: tempBuffer[0].Position().MsgID,
Timestamp: timeStamp, Timestamp: currTs,
MsgGroup: consumer.Subscription(), MsgGroup: consumer.Subscription(),
} }
endMsgPositions = append(endMsgPositions, newPos) endMsgPositions = append(endMsgPositions, newPos)
} else { } else if timeTickMsg != nil {
// if tempBuffer is empty, use timeTickMsg to seek
newPos = &internalpb.MsgPosition{ newPos = &internalpb.MsgPosition{
ChannelName: timeTickMsg.Position().ChannelName, ChannelName: timeTickMsg.Position().ChannelName,
MsgID: timeTickMsg.Position().MsgID, MsgID: timeTickMsg.Position().MsgID,
Timestamp: timeStamp, Timestamp: currTs,
MsgGroup: consumer.Subscription(), MsgGroup: consumer.Subscription(),
} }
endMsgPositions = append(endMsgPositions, newPos) endMsgPositions = append(endMsgPositions, newPos)
} }
ms.msgPositions[consumer] = newPos ms.chanMsgPos[consumer] = newPos
} }
ms.unsolvedMutex.Unlock() ms.chanMsgBufMutex.Unlock()
ms.consumerLock.Unlock() ms.consumerLock.Unlock()
msgPack := MsgPack{ msgPack := MsgPack{
BeginTs: ms.lastTimeStamp, BeginTs: ms.lastTimeStamp,
EndTs: timeStamp, EndTs: currTs,
Msgs: timeTickBuf, Msgs: timeTickBuf,
StartPositions: startMsgPosition, StartPositions: startMsgPosition,
EndPositions: endMsgPositions, EndPositions: endMsgPositions,
} }
//log.Debug("send msg pack", zap.Int("len", len(msgPack.Msgs)), zap.Uint64("currTs", currTs))
ms.receiveBuf <- &msgPack ms.receiveBuf <- &msgPack
ms.lastTimeStamp = timeStamp ms.lastTimeStamp = currTs
} }
} }
} }
func (ms *MqTtMsgStream) findTimeTick(consumer mqclient.Consumer, // Save all msgs into chanMsgBuf[] till receive one ttMsg
eofMsgMap map[mqclient.Consumer]Timestamp, func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqclient.Consumer) {
wg *sync.WaitGroup, defer ms.chanWaitGroup.Done()
findMapMutex *sync.RWMutex) {
defer wg.Done()
for { for {
select { select {
case <-ms.ctx.Done(): case <-ms.ctx.Done():
return return
case <-ms.chanStopChan[consumer]:
return
case msg, ok := <-consumer.Chan(): case msg, ok := <-consumer.Chan():
if !ok { if !ok {
log.Debug("consumer closed!") log.Debug("consumer closed!")
@ -591,73 +615,54 @@ func (ms *MqTtMsgStream) findTimeTick(consumer mqclient.Consumer,
} }
consumer.Ack(msg) consumer.Ack(msg)
headerMsg := commonpb.MsgHeader{} tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
err := proto.Unmarshal(msg.Payload(), &headerMsg)
if err != nil { if err != nil {
log.Error("Failed to unmarshal message header", zap.Error(err)) log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
continue continue
} }
tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
if err != nil {
log.Error("Failed to unmarshal tsMsg", zap.Error(err))
continue
}
// set msg info to tsMsg
tsMsg.SetPosition(&MsgPosition{
ChannelName: filepath.Base(msg.Topic()),
MsgID: msg.ID().Serialize(),
})
sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
if ok { if ok {
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
} }
ms.unsolvedMutex.Lock() ms.chanMsgBufMutex.Lock()
ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
ms.unsolvedMutex.Unlock() ms.chanMsgBufMutex.Unlock()
if headerMsg.Base.MsgType == commonpb.MsgType_TimeTick { if tsMsg.Type() == commonpb.MsgType_TimeTick {
findMapMutex.Lock() ms.chanTtMsgTimeMutex.Lock()
eofMsgMap[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp ms.chanTtMsgTime[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
findMapMutex.Unlock() ms.chanTtMsgTimeMutex.Unlock()
sp.Finish() sp.Finish()
return return
} }
sp.Finish() sp.Finish()
case <-ms.stopConsumeChan[consumer]:
return
} }
} }
} }
func checkTimeTickMsg(msg map[mqclient.Consumer]Timestamp, // return true only when all channels reach same timetick
isChannelReady map[mqclient.Consumer]bool, func (ms *MqTtMsgStream) allChanReachSameTtMsg(chanTtMsgSync map[mqclient.Consumer]bool) (Timestamp, bool) {
mu *sync.RWMutex) (Timestamp, bool) { tsMap := make(map[Timestamp]int)
checkMap := make(map[Timestamp]int)
var maxTime Timestamp = 0 var maxTime Timestamp = 0
for _, v := range msg { for _, t := range ms.chanTtMsgTime {
checkMap[v]++ tsMap[t]++
if v > maxTime { if t > maxTime {
maxTime = v maxTime = t
} }
} }
if len(checkMap) <= 1 { // when all channels reach same timetick, timeMap should contain only 1 timestamp
for consumer := range msg { if len(tsMap) <= 1 {
isChannelReady[consumer] = false for consumer := range ms.chanTtMsgTime {
chanTtMsgSync[consumer] = false
} }
return maxTime, true return maxTime, true
} }
for consumer := range msg { for consumer := range ms.chanTtMsgTime {
mu.RLock() ms.chanTtMsgTimeMutex.RLock()
v := msg[consumer] chanTtMsgSync[consumer] = (ms.chanTtMsgTime[consumer] == maxTime)
mu.RUnlock() ms.chanTtMsgTimeMutex.RUnlock()
if v != maxTime {
isChannelReady[consumer] = false
} else {
isChannelReady[consumer] = true
}
} }
return 0, false return 0, false
@ -747,7 +752,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
ChannelName: filepath.Base(msg.Topic()), ChannelName: filepath.Base(msg.Topic()),
MsgID: msg.ID().Serialize(), MsgID: msg.ID().Serialize(),
}) })
ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
} }
} }
} }

View File

@ -712,60 +712,77 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
outputStream.Close() outputStream.Close()
} }
func createMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack {
msgPacks := make([]*MsgPack, numOfMsgPack)
// generate MsgPack
for i := 0; i < numOfMsgPack; i++ {
if i%2 == 0 {
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*deltaTs, (i/2+2)*deltaTs+2)
} else {
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * deltaTs))
}
}
msgPacks = append(msgPacks, nil)
msgPacks = append(msgPacks, getTimeTickMsgPack(int64(numOfMsgPack*deltaTs)))
return msgPacks
}
func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error {
log.Println("==============produce msg==================")
for i := 0; i < len(msgPacks); i++ {
printMsgPack(msgPacks[i])
if i%2 == 0 {
// insert msg use Produce
if err := ms.Produce(msgPacks[i]); err != nil {
return err
}
} else {
// tt msg use Broadcast
if err := ms.Broadcast(msgPacks[i]); err != nil {
return err
}
}
}
return nil
}
// //
// This testcase will generate MsgPacks as following: // This testcase will generate MsgPacks as following:
// //
// Insert Insert Insert Insert Insert Insert // Insert Insert Insert Insert Insert Insert
// |----------|----------|----------|----------|----------|----------| // c1 |----------|----------|----------|----------|----------|----------|
// ^ ^ ^ ^ ^ ^ // ^ ^ ^ ^ ^ ^
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) // TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
// //
// Insert Insert Insert Insert Insert Insert
// c2 |----------|----------|----------|----------|----------|----------|
// ^ ^ ^ ^ ^ ^
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
// Then check: // Then check:
// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be // 1. For each msg in MsgPack received by ttMsgStream consumer, there should be
// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs // msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs
// 2. The count of consumed msg should be equal to the count of produced msg // 2. The count of consumed msg should be equal to the count of produced msg
// //
func TestStream_PulsarTtMsgStream_1(t *testing.T) { func TestStream_PulsarTtMsgStream_1(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress") pulsarAddr, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) c1 := funcutil.RandomString(8)
producerChannels := []string{c1, c2} c2 := funcutil.RandomString(8)
p1Channels := []string{c1}
p2Channels := []string{c2}
consumerChannels := []string{c1, c2} consumerChannels := []string{c1, c2}
consumerSubName := funcutil.RandomString(8) consumerSubName := funcutil.RandomString(8)
const msgsInPack = 5 inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels)
const numOfMsgPack = 10 msgPacks1 := createMsgPacks(3, 10, 10)
msgPacks := make([]*MsgPack, numOfMsgPack) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
// generate MsgPack inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels)
for i := 0; i < numOfMsgPack; i++ { msgPacks2 := createMsgPacks(5, 10, 10)
if i%2 == 0 { assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2))
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22)
} else {
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10))
}
}
msgPacks = append(msgPacks, nil)
msgPacks = append(msgPacks, getTimeTickMsgPack(100))
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
// produce msg
log.Println("==============produce msg==================")
for i := 0; i < len(msgPacks); i++ {
printMsgPack(msgPacks[i])
if i%2 == 0 {
// insert msg use Produce
err := inputStream.Produce(msgPacks[i])
assert.Nil(t, err)
} else {
// tt msg use Broadcast
err := inputStream.Broadcast(msgPacks[i])
assert.Nil(t, err)
}
}
// consume msg // consume msg
outputStream := getPulsarTtOutputStream(pulsarAddr, consumerChannels, consumerSubName)
log.Println("===============receive msg=================") log.Println("===============receive msg=================")
checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int { checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int {
rcvMsg := 0 rcvMsg := 0
@ -783,63 +800,48 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) {
} }
return rcvMsg return rcvMsg
} }
msgCount := checkNMsgPack(t, outputStream, len(msgPacks)/2) msgCount := checkNMsgPack(t, outputStream, len(msgPacks1)/2)
assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount) cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs)
cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs)
assert.Equal(t, (cnt1 + cnt2), msgCount)
inputStream.Close() inputStream1.Close()
inputStream2.Close()
outputStream.Close() outputStream.Close()
} }
// //
// This testcase will generate MsgPacks as following: // This testcase will generate MsgPacks as following:
// //
// Insert Insert Insert Insert Insert Insert // Insert Insert Insert Insert Insert Insert
// |----------|----------|----------|----------|----------|----------| // c1 |----------|----------|----------|----------|----------|----------|
// ^ ^ ^ ^ ^ ^ // ^ ^ ^ ^ ^ ^
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) // TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
// //
// Insert Insert Insert Insert Insert Insert
// c2 |----------|----------|----------|----------|----------|----------|
// ^ ^ ^ ^ ^ ^
// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100)
// Then check: // Then check:
// 1. ttMsgStream consumer can seek to the right position and resume // 1. ttMsgStream consumer can seek to the right position and resume
// 2. The count of consumed msg should be equal to the count of produced msg // 2. The count of consumed msg should be equal to the count of produced msg
// //
func TestStream_PulsarTtMsgStream_2(t *testing.T) { func TestStream_PulsarTtMsgStream_2(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress") pulsarAddr, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) c1 := funcutil.RandomString(8)
producerChannels := []string{c1, c2} c2 := funcutil.RandomString(8)
p1Channels := []string{c1}
p2Channels := []string{c2}
consumerChannels := []string{c1, c2} consumerChannels := []string{c1, c2}
consumerSubName := funcutil.RandomString(8) consumerSubName := funcutil.RandomString(8)
const msgsInPack = 5 inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels)
const numOfMsgPack = 10 msgPacks1 := createMsgPacks(3, 10, 10)
msgPacks := make([]*MsgPack, numOfMsgPack) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
// generate MsgPack inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels)
for i := 0; i < numOfMsgPack; i++ { msgPacks2 := createMsgPacks(5, 10, 10)
if i%2 == 0 { assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2))
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22)
} else {
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10))
}
}
msgPacks = append(msgPacks, nil)
msgPacks = append(msgPacks, getTimeTickMsgPack(100))
inputStream := getPulsarInputStream(pulsarAddress, producerChannels)
// produce msg
log.Println("===============produce msg=================")
for i := 0; i < len(msgPacks); i++ {
printMsgPack(msgPacks[i])
if i%2 == 0 {
// insert msg use Produce
err := inputStream.Produce(msgPacks[i])
assert.Nil(t, err)
} else {
// tt msg use Broadcast
err := inputStream.Broadcast(msgPacks[i])
assert.Nil(t, err)
}
}
// consume msg // consume msg
log.Println("=============receive msg===================") log.Println("=============receive msg===================")
@ -849,9 +851,9 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) {
var outputStream MsgStream var outputStream MsgStream
msgCount := len(rcvMsgPacks) msgCount := len(rcvMsgPacks)
if msgCount == 0 { if msgCount == 0 {
outputStream = getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) outputStream = getPulsarTtOutputStream(pulsarAddr, consumerChannels, consumerSubName)
} else { } else {
outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddress, rcvMsgPacks[msgCount-1].EndPositions) outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddr, rcvMsgPacks[msgCount-1].EndPositions)
} }
msgPack := outputStream.Consume() msgPack := outputStream.Consume()
rcvMsgPacks = append(rcvMsgPacks, msgPack) rcvMsgPacks = append(rcvMsgPacks, msgPack)
@ -868,12 +870,15 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) {
} }
msgCount := 0 msgCount := 0
for i := 0; i < len(msgPacks)/2; i++ { for i := 0; i < len(msgPacks1)/2; i++ {
msgCount += resumeMsgPack(t) msgCount += resumeMsgPack(t)
} }
assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount) cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs)
cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs)
assert.Equal(t, (cnt1 + cnt2), msgCount)
inputStream.Close() inputStream1.Close()
inputStream2.Close()
} }
/****************************************Rmq test******************************************/ /****************************************Rmq test******************************************/

View File

@ -64,9 +64,9 @@ func (tt *TimeTick) Start() error {
}, },
} }
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
for _, msg := range msgPack.Msgs { //for _, msg := range msgPack.Msgs {
log.Debug("proxyservice", zap.Stringer("msg type", msg.Type())) // log.Debug("proxyservice", zap.Stringer("msg type", msg.Type()))
} //}
for _, channel := range tt.channels { for _, channel := range tt.channels {
err = channel.Broadcast(&msgPack) err = channel.Broadcast(&msgPack)
if err != nil { if err != nil {