Improve unit test in mq_msgstream.go (#8427)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/8427/merge
Xiangyu Wang 2021-09-23 20:07:55 +08:00 committed by GitHub
parent 7eccfb4ede
commit 884d0d672e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 252 additions and 251 deletions

View File

@ -42,257 +42,6 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelID := range keys {
_, ok := result[channelID]
if ok == false {
msgPack := MsgPack{}
result[channelID] = &msgPack
}
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result, nil
}
func getTsMsg(msgType MsgType, reqID UniqueID) TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
switch msgType {
case commonpb.MsgType_Insert:
insertRequest := internalpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
CollectionName: "Collection",
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "0",
Timestamps: []Timestamp{time},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := &InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
return insertMsg
case commonpb.MsgType_Delete:
deleteRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
ChannelID: "1",
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
return deleteMsg
case commonpb.MsgType_Search:
searchRequest := internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
ResultChannelID: "0",
}
searchMsg := &SearchMsg{
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
return searchMsg
case commonpb.MsgType_SearchResult:
searchResult := internalpb.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SearchResult,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: "0",
}
searchResultMsg := &SearchResultMsg{
BaseMsg: baseMsg,
SearchResults: searchResult,
}
return searchResultMsg
case commonpb.MsgType_TimeTick:
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
case commonpb.MsgType_QueryNodeStats:
queryNodeSegStats := internalpb.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_QueryNodeStats,
SourceID: reqID,
},
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{
BaseMsg: baseMsg,
QueryNodeStats: queryNodeSegStats,
}
return queryNodeSegStatsMsg
}
return nil
}
func getTimeTickMsg(reqID UniqueID) TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
}
// Generate MsgPack contains 'num' msgs, with timestamp in (start, end)
func getRandInsertMsgPack(num int, start int, end int) *MsgPack {
Rand := rand.New(rand.NewSource(time.Now().UnixNano()))
set := make(map[int]bool)
msgPack := MsgPack{}
for len(set) < num {
reqID := Rand.Int()%(end-start-1) + start + 1
_, ok := set[reqID]
if !ok {
set[reqID] = true
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(reqID)))
}
}
return &msgPack
}
func getInsertMsgPack(ts []int) *MsgPack {
msgPack := MsgPack{}
for i := 0; i < len(ts); i++ {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(ts[i])))
}
return &msgPack
}
func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID))
return &msgPack
}
func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
return inputStream
}
func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
return outputStream
}
func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
return outputStream
}
func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPosition) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
consumerName := []string{}
for _, c := range positions {
consumerName = append(consumerName, c.ChannelName)
}
outputStream.AsConsumer(consumerName, positions[0].MsgGroup)
outputStream.Seek(positions)
outputStream.Start()
return outputStream
}
func receiveMsg(outputStream MsgStream, msgCount int) {
receiveCount := 0
for {
result := outputStream.Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
log.Println("msg type: ", v.Type(), ", msg value: ", v)
}
log.Println("================")
}
if receiveCount >= msgCount {
break
}
}
}
func printMsgPack(msgPack *MsgPack) {
if msgPack == nil {
log.Println("msg nil")
} else {
for _, v := range msgPack.Msgs {
log.Println("msg type: ", v.Type(), ", msg value: ", v)
}
}
log.Println("================")
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
@ -1156,3 +905,255 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
receiveMsg(outputStream, len(msgPack1.Msgs))
Close(rocksdbName, inputStream, outputStream, etcdKV)
}
/* ========================== Utility functions ========================== */
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelID := range keys {
_, ok := result[channelID]
if ok == false {
msgPack := MsgPack{}
result[channelID] = &msgPack
}
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result, nil
}
func getTsMsg(msgType MsgType, reqID UniqueID) TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
switch msgType {
case commonpb.MsgType_Insert:
insertRequest := internalpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
CollectionName: "Collection",
PartitionName: "Partition",
SegmentID: 1,
ChannelID: "0",
Timestamps: []Timestamp{time},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := &InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
return insertMsg
case commonpb.MsgType_Delete:
deleteRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
CollectionName: "Collection",
ChannelID: "1",
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
return deleteMsg
case commonpb.MsgType_Search:
searchRequest := internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
ResultChannelID: "0",
}
searchMsg := &SearchMsg{
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
return searchMsg
case commonpb.MsgType_SearchResult:
searchResult := internalpb.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SearchResult,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: "0",
}
searchResultMsg := &SearchResultMsg{
BaseMsg: baseMsg,
SearchResults: searchResult,
}
return searchResultMsg
case commonpb.MsgType_TimeTick:
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: reqID,
Timestamp: 1,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
case commonpb.MsgType_QueryNodeStats:
queryNodeSegStats := internalpb.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_QueryNodeStats,
SourceID: reqID,
},
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{
BaseMsg: baseMsg,
QueryNodeStats: queryNodeSegStats,
}
return queryNodeSegStatsMsg
}
return nil
}
func getTimeTickMsg(reqID UniqueID) TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
}
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
return timeTickMsg
}
// Generate MsgPack contains 'num' msgs, with timestamp in (start, end)
func getRandInsertMsgPack(num int, start int, end int) *MsgPack {
Rand := rand.New(rand.NewSource(time.Now().UnixNano()))
set := make(map[int]bool)
msgPack := MsgPack{}
for len(set) < num {
reqID := Rand.Int()%(end-start-1) + start + 1
_, ok := set[reqID]
if !ok {
set[reqID] = true
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(reqID)))
}
}
return &msgPack
}
func getInsertMsgPack(ts []int) *MsgPack {
msgPack := MsgPack{}
for i := 0; i < len(ts); i++ {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(ts[i])))
}
return &msgPack
}
func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID))
return &msgPack
}
func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
return inputStream
}
func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
return outputStream
}
func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
return outputStream
}
func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPosition) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
consumerName := []string{}
for _, c := range positions {
consumerName = append(consumerName, c.ChannelName)
}
outputStream.AsConsumer(consumerName, positions[0].MsgGroup)
outputStream.Seek(positions)
outputStream.Start()
return outputStream
}
func receiveMsg(outputStream MsgStream, msgCount int) {
receiveCount := 0
for {
result := outputStream.Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
log.Println("msg type: ", v.Type(), ", msg value: ", v)
}
log.Println("================")
}
if receiveCount >= msgCount {
break
}
}
}
func printMsgPack(msgPack *MsgPack) {
if msgPack == nil {
log.Println("msg nil")
} else {
for _, v := range msgPack.Msgs {
log.Println("msg type: ", v.Type(), ", msg value: ", v)
}
}
log.Println("================")
}