Fix RocksMQ flaky test (#18954)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/18975/head
Xiaofan 2022-09-01 19:46:59 +08:00 committed by GitHub
parent c054873908
commit 10d1140885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 11 deletions

View File

@ -46,13 +46,15 @@ func TestRmqRetention_Basic(t *testing.T) {
var params paramtable.BaseTable
params.Init()
checkTimeInterval := 2
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, int64(checkTimeInterval))
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
assert.Nil(t, err)
defer rmq.Close()
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -89,7 +91,6 @@ func TestRmqRetention_Basic(t *testing.T) {
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := 2
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up
err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID)
@ -139,14 +140,14 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
var params paramtable.BaseTable
params.Init()
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
assert.Nil(t, err)
defer rmq.Close()
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -245,6 +246,8 @@ func TestRmqRetention_MultipleTopic(t *testing.T) {
metaPath := retentionPath + "meta_multi_topic"
os.RemoveAll(metaPath)
var params paramtable.BaseTable
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
@ -254,8 +257,6 @@ func TestRmqRetention_MultipleTopic(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
// retention by secs
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -467,6 +468,8 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
var params paramtable.BaseTable
params.Init()
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -475,8 +478,6 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
// retention by secs
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -591,6 +592,8 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
var params paramtable.BaseTable
params.Init()
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -598,8 +601,6 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
// update some configrocksmq_retentions to make cleanup trigger faster
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
topicName := "topic_a"
err = rmq.CreateTopic(topicName)