fix: unstable ut and fix goroutine leak (#29624)

fix unstable ut and fix goroutine leak
related: #27801

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/29138/merge
smellthemoon 2024-01-04 17:24:47 +08:00 committed by GitHub
parent a3aff37f73
commit a988daf143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 7 deletions

View File

@ -65,6 +65,7 @@ func TestClient_CreateProducer(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client1"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client1, err := NewClient(Options{
Server: rmq,
@ -105,6 +106,7 @@ func TestClient_Subscribe(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client2"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client1, err := NewClient(Options{
Server: rmq,
@ -169,6 +171,7 @@ func TestClient_SeekLatest(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/seekLatest"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client, err := NewClient(Options{
Server: rmq,
@ -243,6 +246,7 @@ func TestClient_consume(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client3"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client, err := NewClient(Options{
Server: rmq,
@ -281,6 +285,7 @@ func TestRocksmq_Properties(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client4"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client, err := NewClient(Options{
Server: rmq,
@ -325,13 +330,6 @@ func TestRocksmq_Properties(t *testing.T) {
_, err = producer.Send(msg)
assert.NoError(t, err)
msg = &mqwrapper.ProducerMessage{
Payload: msgb,
Properties: map[string]string{common.TraceIDKey: "b"},
}
_, err = producer.Send(msg)
assert.NoError(t, err)
msgChan := consumer.Chan()
msgConsume, ok := <-msgChan
assert.True(t, ok)
@ -339,6 +337,16 @@ func TestRocksmq_Properties(t *testing.T) {
assert.Equal(t, msgConsume.Properties()[common.TraceIDKey], "a")
assert.NoError(t, err)
// rocksmq consumer needs produce to notify to receive msg
// if produce all in the begin, it will stuck if consume not that fast
// related with https://github.com/milvus-io/milvus/issues/27801
msg = &mqwrapper.ProducerMessage{
Payload: msgb,
Properties: map[string]string{common.TraceIDKey: "b"},
}
_, err = producer.Send(msg)
assert.NoError(t, err)
msgConsume, ok = <-msgChan
assert.True(t, ok)
assert.Equal(t, len(msgConsume.Properties()), 1)

View File

@ -53,6 +53,7 @@ func TestConsumer_newConsumer(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_consumer1"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client, err := newClient(Options{
Server: rmq,
@ -124,6 +125,7 @@ func TestConsumer_Seek(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_consumer2"
rmq := newRocksMQ(t, rmqPathTest)
defer rmq.Close()
defer removePath(rmqPath)
client, err := newClient(Options{
Server: rmq,