fix: Fix unstable TestDispatchToVchannels ut (#35163)

issue: https://github.com/milvus-io/milvus/issues/35150

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/35172/head
yihao.dai 2024-08-01 14:56:13 +08:00 committed by GitHub
parent 5bbb1c201c
commit 7721a28584
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 15 deletions

View File

@ -189,7 +189,7 @@ func (suite *SimulationSuite) SetupTest() {
go suite.manager.Run()
}
func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) {
func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup, collectionID int64) {
defer wg.Done()
const timeTickCount = 100
@ -223,7 +223,7 @@ func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) {
ddlNum := rand.Intn(2)
for j := 0; j < ddlNum; j++ {
err := suite.producer.Produce(&msgstream.MsgPack{
Msgs: []msgstream.TsMsg{genDDLMsg(commonpb.MsgType_DropCollection)},
Msgs: []msgstream.TsMsg{genDDLMsg(commonpb.MsgType_DropCollection, collectionID)},
})
assert.NoError(suite.T(), err)
for k := range suite.vchannels {
@ -293,10 +293,13 @@ func (suite *SimulationSuite) TestDispatchToVchannels() {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
const vchannelNum = 10
const (
vchannelNum = 10
collectionID int64 = 1234
)
suite.vchannels = make(map[string]*vchannelHelper, vchannelNum)
for i := 0; i < vchannelNum; i++ {
vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i)
vchannel := fmt.Sprintf("%s_%dv%d", suite.pchannel, collectionID, i)
output, err := suite.manager.Add(context.Background(), vchannel, nil, common.SubscriptionPositionEarliest)
assert.NoError(suite.T(), err)
suite.vchannels[vchannel] = &vchannelHelper{output: output}
@ -304,18 +307,19 @@ func (suite *SimulationSuite) TestDispatchToVchannels() {
wg := &sync.WaitGroup{}
wg.Add(1)
go suite.produceMsg(wg)
go suite.produceMsg(wg, collectionID)
wg.Wait()
for vchannel := range suite.vchannels {
wg.Add(1)
go suite.consumeMsg(ctx, wg, vchannel)
}
wg.Wait()
for _, helper := range suite.vchannels {
assert.Equal(suite.T(), helper.pubInsMsgNum, helper.subInsMsgNum)
assert.Equal(suite.T(), helper.pubDelMsgNum, helper.subDelMsgNum)
assert.Equal(suite.T(), helper.pubDDLMsgNum, helper.subDDLMsgNum)
assert.Equal(suite.T(), helper.pubPackNum, helper.subPackNum)
for vchannel, helper := range suite.vchannels {
msg := fmt.Sprintf("vchannel=%s", vchannel)
assert.Equal(suite.T(), helper.pubInsMsgNum, helper.subInsMsgNum, msg)
assert.Equal(suite.T(), helper.pubDelMsgNum, helper.subDelMsgNum, msg)
assert.Equal(suite.T(), helper.pubDDLMsgNum, helper.subDDLMsgNum, msg)
assert.Equal(suite.T(), helper.pubPackNum, helper.subPackNum, msg)
}
}

View File

@ -148,34 +148,38 @@ func genDeleteMsg(numRows int, vchannel string, msgID typeutil.UniqueID) *msgstr
}
}
func genDDLMsg(msgType commonpb.MsgType) msgstream.TsMsg {
func genDDLMsg(msgType commonpb.MsgType, collectionID int64) msgstream.TsMsg {
switch msgType {
case commonpb.MsgType_CreateCollection:
return &msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{0}},
CreateCollectionRequest: &msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
CollectionID: collectionID,
},
}
case commonpb.MsgType_DropCollection:
return &msgstream.DropCollectionMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{0}},
DropCollectionRequest: &msgpb.DropCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
CollectionID: collectionID,
},
}
case commonpb.MsgType_CreatePartition:
return &msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{0}},
CreatePartitionRequest: &msgpb.CreatePartitionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreatePartition},
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreatePartition},
CollectionID: collectionID,
},
}
case commonpb.MsgType_DropPartition:
return &msgstream.DropPartitionMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{0}},
DropPartitionRequest: &msgpb.DropPartitionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition},
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition},
CollectionID: collectionID,
},
}
}