Improve rocksmq code coverage (#7483)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/7515/head
yukun 2021-09-06 19:36:42 +08:00 committed by GitHub
parent 90e30726a4
commit 7025a6e925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 424 additions and 75 deletions

View File

@ -14,6 +14,7 @@ package rocksmq
import (
"os"
"sync"
"sync/atomic"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/log"
@ -56,8 +57,8 @@ func InitRocksMQ() error {
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
RocksmqRetentionTimeInMinutes = params.ParseInt64("rocksmq.retentionTimeInMinutes")
RocksmqRetentionSizeInMB = params.ParseInt64("rocksmq.retentionSizeInMB")
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, params.ParseInt64("rocksmq.retentionTimeInMinutes"))
atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64("rocksmq.retentionSizeInMB"))
log.Debug("Rocksmq retention: ", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB))
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
if err != nil {

View File

@ -29,11 +29,12 @@ import (
type UniqueID = typeutil.UniqueID
var RocksmqPageSize int64 = 2 << 30
const (
DefaultMessageID = "-1"
FixedChannelNameLen = 320
RocksDBLRUCacheCapacity = 3 << 30
RocksmqPageSize = 2 << 30
kvSuffix = "_meta_kv"
@ -275,19 +276,6 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
key := groupName + "/" + topicName + "/current_id"
// keyExist := false
// if ll, ok := topicMu.Load(topicName); !ok {
// keyExist = rmq.checkKeyExist(key)
// } else {
// if lock, lok := ll.(*sync.Mutex); lok {
// lock.Lock()
// defer lock.Unlock()
// keyExist = rmq.checkKeyExist(key)
// } else {
// keyExist = rmq.checkKeyExist(key)
// }
// }
if rmq.checkKeyExist(key) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
@ -376,22 +364,23 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
}
if UniqueID(msgLen) != idEnd-idStart {
log.Debug("RocksMQ: Obtained id length is not equal that of message")
return errors.New("Obtained id length is not equal that of message")
}
/* Step I: Insert data to store system */
batch := gorocksdb.NewWriteBatch()
msgSizes := make(map[UniqueID]int64)
msgIDs := make([]UniqueID, msgLen)
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
key, err := combKey(topicName, idStart+UniqueID(i))
msgID := idStart + UniqueID(i)
key, err := combKey(topicName, msgID)
if err != nil {
log.Debug("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(idStart+UniqueID(i), 10) + ")")
return err
}
batch.Put([]byte(key), messages[i].Payload)
msgSizes[idStart+UniqueID(i)] = int64(len(messages[i].Payload))
msgIDs[i] = msgID
msgSizes[msgID] = int64(len(messages[i].Payload))
}
err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch)
@ -438,14 +427,14 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
// Update message page info
// TODO(yukun): Should this be in a go routine
err = rmq.UpdatePageInfo(topicName, msgSizes)
err = rmq.UpdatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
return err
}
return nil
}
func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64) error {
func (rmq *rocksmq) UpdatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
if err != nil {
@ -459,11 +448,12 @@ func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64
if err != nil {
return err
}
for k, v := range msgSizes {
if curMsgSize+v > RocksmqPageSize {
for _, id := range msgIDs {
msgSize := msgSizes[id]
if curMsgSize+msgSize > RocksmqPageSize {
// Current page is full
newPageSize := curMsgSize + v
pageEndID := k
newPageSize := curMsgSize + msgSize
pageEndID := id
// Update page message size for current page. key is page end ID
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
err := rmq.kv.Save(pageMsgSizeKey, strconv.FormatInt(newPageSize, 10))
@ -484,7 +474,7 @@ func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64
}
curMsgSize = 0
} else {
curMsgSize += v
curMsgSize += msgSize
// Update message size to current message size
err := rmq.kv.Save(msgSizeKey, strconv.FormatInt(curMsgSize, 10))
if err != nil {
@ -629,7 +619,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) {
}
func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error {
ll, ok := rmq.ackedMu.Load(topicName)
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
}
@ -685,6 +675,11 @@ func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
ackedInfo := info.(*topicAckedInfo)
ackedInfo.ackedTs[minBeginID] = ts
rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
}
if minBeginID == newID {
// Means the begin_id of topic update to newID, so needs to update acked size
ackedSizeKey := AckedSizeTitle + topicName

View File

@ -21,11 +21,26 @@ import (
"time"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/util/paramtable"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/stretchr/testify/assert"
)
var Params paramtable.BaseTable
var rmqPath string = "/tmp/rocksmq"
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
return idAllocator
}
func TestFixChannelName(t *testing.T) {
name := "abcd"
fixName, err := fixChannelName(name)
@ -42,23 +57,113 @@ func etcdEndpoints() []string {
return etcdEndpoints
}
func TestRocksMQ(t *testing.T) {
ep := etcdEndpoints()
etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
func TestInitRmq(t *testing.T) {
name := "/tmp/rmq_init"
endpoints := os.Getenv("ETCD_ENDPOINTS")
if endpoints == "" {
endpoints = "localhost:2379"
}
etcdEndpoints := strings.Split(endpoints, ",")
etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, "/etcd/test/root")
if err != nil {
log.Fatalf("New clientv3 error = %v", err)
}
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq"
_ = os.RemoveAll(name)
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
err = InitRmq(name, idAllocator)
defer Rmq.stopRetention()
assert.NoError(t, err)
defer CloseRocksMQ()
}
func TestGlobalRmq(t *testing.T) {
// Params.Init()
rmqPath := "/tmp/milvus/rdb_data_global"
os.Setenv("ROCKSMQ_PATH", rmqPath)
defer os.RemoveAll(rmqPath)
err := InitRocksMQ()
defer Rmq.stopRetention()
assert.NoError(t, err)
defer CloseRocksMQ()
}
func TestRegisterConsumer(t *testing.T) {
kvPath := rmqPath + "_kv_register"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_register"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_kv_register"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.stopRetention()
topicName := "topic_register"
groupName := "group_register"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
assert.Nil(t, err)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
MsgMutex: make(chan struct{}),
}
rmq.RegisterConsumer(consumer)
exist, _ := rmq.ExistConsumerGroup(topicName, groupName)
assert.Equal(t, exist, true)
dummyGrpName := "group_dummy"
exist, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
assert.Equal(t, exist, false)
msgA := "a_message"
pMsgs := make([]ProducerMessage, 1)
pMsgA := ProducerMessage{Payload: []byte(msgA)}
pMsgs[0] = pMsgA
_ = idAllocator.UpdateID()
err = rmq.Produce(topicName, pMsgs)
assert.Error(t, err)
rmq.Notify(topicName, groupName)
consumer1 := &Consumer{
Topic: topicName,
GroupName: groupName,
MsgMutex: make(chan struct{}),
}
rmq.RegisterConsumer(consumer1)
groupName2 := "group_register2"
consumer2 := &Consumer{
Topic: topicName,
GroupName: groupName2,
MsgMutex: make(chan struct{}),
}
rmq.RegisterConsumer(consumer2)
err = rmq.DestroyConsumerGroup(topicName, groupName)
assert.NoError(t, err)
}
func TestRocksMQ(t *testing.T) {
kvPath := rmqPath + "_kv_rmq"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_rmq"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_kv_rmq"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
channelName := "channel_a"
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
@ -96,7 +201,56 @@ func TestRocksMQ(t *testing.T) {
assert.Equal(t, len(cMsgs), 2)
assert.Equal(t, string(cMsgs[0].Payload), "b_message")
assert.Equal(t, string(cMsgs[1].Payload), "c_message")
}
func TestRocksMQDummy(t *testing.T) {
kvPath := rmqPath + "_kv_dummy"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_dummy"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_kv_dummy"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
channelName := "channel_a"
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
err = rmq.CreateTopic(channelName)
assert.NoError(t, err)
channelName1 := "channel_dummy"
err = rmq.DestroyTopic(channelName1)
assert.NoError(t, err)
err = rmq.DestroyConsumerGroup(channelName, channelName1)
assert.NoError(t, err)
err = rmq.Produce(channelName, nil)
assert.Error(t, err)
err = rmq.Produce(channelName1, nil)
assert.Error(t, err)
groupName1 := "group_dummy"
err = rmq.Seek(channelName1, groupName1, 0)
assert.Error(t, err)
rmq.stopRetention()
channelName2 := strings.Repeat(channelName1, 100)
err = rmq.CreateTopic(string(channelName2))
assert.NoError(t, err)
err = rmq.Produce(string(channelName2), nil)
assert.Error(t, err)
msgA := "a_message"
pMsgs := make([]ProducerMessage, 1)
pMsgA := ProducerMessage{Payload: []byte(msgA)}
pMsgs[0] = pMsgA
}
func TestRocksMQ_Loop(t *testing.T) {
@ -115,6 +269,7 @@ func TestRocksMQ_Loop(t *testing.T) {
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
loopNum := 100
channelName := "channel_test"
@ -164,7 +319,6 @@ func TestRocksMQ_Loop(t *testing.T) {
cMsgs, err = rmq.Consume(channelName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 0)
rmq.stopRetention()
}
func TestRocksMQ_Goroutines(t *testing.T) {
@ -182,6 +336,7 @@ func TestRocksMQ_Goroutines(t *testing.T) {
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
loopNum := 100
channelName := "channel_test"
@ -225,7 +380,6 @@ func TestRocksMQ_Goroutines(t *testing.T) {
}(&wg, rmq)
}
wg.Wait()
rmq.stopRetention()
}
/**
@ -253,6 +407,7 @@ func TestRocksMQ_Throughout(t *testing.T) {
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
channelName := "channel_throughout_test"
err = rmq.CreateTopic(channelName)
@ -289,7 +444,6 @@ func TestRocksMQ_Throughout(t *testing.T) {
ct1 := time.Now().UnixNano() / int64(time.Millisecond)
cDuration := ct1 - ct0
log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration)
rmq.stopRetention()
}
func TestRocksMQ_MultiChan(t *testing.T) {
@ -307,6 +461,7 @@ func TestRocksMQ_MultiChan(t *testing.T) {
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
channelName0 := "chan01"
channelName1 := "chan11"
@ -338,5 +493,4 @@ func TestRocksMQ_MultiChan(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0))
rmq.stopRetention()
}

View File

@ -16,6 +16,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
@ -27,7 +28,6 @@ import (
var RocksmqRetentionTimeInMinutes int64
var RocksmqRetentionSizeInMB int64
var TickerTimeInMinutes int64 = 1
var CheckTimeInterval int64 = 6
const (
MB = 2 << 20
@ -194,10 +194,6 @@ func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
log.Debug("PrefixLoad failed", zap.Any("error", err))
return
}
if len(keys) != len(vals) {
log.Debug("LoadWithPrefix return unequal value length of keys and values")
return
}
for i, key := range keys {
offset := FixedChannelNameLen + 1
@ -260,13 +256,13 @@ func (ri *retentionInfo) retention() error {
return nil
case t := <-ticker.C:
timeNow := t.Unix()
checkTime := RocksmqRetentionTimeInMinutes * MINUTE / 10
log.Debug("A retention triggered by time ticker: ", zap.Any("ticker", timeNow))
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * 60 / 10
log.Debug("In ticker: ", zap.Any("ticker", timeNow))
ri.lastRetentionTime.Range(func(k, v interface{}) bool {
if v.(int64)+checkTime < timeNow {
err := ri.expiredCleanUp(k.(string))
if err != nil {
panic(err)
log.Warn("Retention expired clean failed", zap.Any("error", err))
}
}
return true
@ -360,11 +356,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
break
}
}
if endID == 0 {
log.Debug("All messages are not expired", zap.Any("topic", topic))
return nil
}
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
// if endID == 0 {
// log.Debug("All messages are not expired")
// return nil
// }
// Delete page message size in rocksdb_kv
if pageInfo != nil {
@ -406,6 +402,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
ri.pageInfo.Store(topic, pageInfo)
}
if endID == 0 {
log.Debug("All messages are not expired")
return nil
}
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
// Delete acked_ts in rocksdb_kv
@ -483,9 +483,9 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
}
func msgTimeExpiredCheck(ackedTs int64) bool {
return ackedTs+RocksmqRetentionTimeInMinutes*MINUTE < time.Now().Unix()
return ackedTs+atomic.LoadInt64(&RocksmqRetentionTimeInMinutes)*MINUTE < time.Now().Unix()
}
func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool {
return ackedSize-deletedAckedSize > RocksmqRetentionSizeInMB*MB
return ackedSize-deletedAckedSize > atomic.LoadInt64(&RocksmqRetentionSizeInMB)*MB
}

View File

@ -14,38 +14,34 @@ package rocksmq
import (
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
return idAllocator
}
var path string = "/tmp/rmq_retention"
func TestRmqRetention(t *testing.T) {
//RocksmqRetentionSizeInMB = 0
//RocksmqRetentionTimeInMinutes = 0
kvPath := "/tmp/rocksmq_idAllocator_kv"
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
kvPath := path + "_kv"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := "/tmp/rocksmq_test"
rocksdbPath := path + "_db"
defer os.RemoveAll(rocksdbPath)
metaPath := rocksdbPath + "_meta_kv"
metaPath := path + "_meta_kv"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
@ -81,7 +77,8 @@ func TestRmqRetention(t *testing.T) {
}
assert.Equal(t, len(cMsgs), msgNum)
time.Sleep(time.Duration(CheckTimeInterval+1) * time.Second)
checkTimeInterval := 6
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up
err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID)
assert.Nil(t, err)
@ -89,3 +86,205 @@ func TestRmqRetention(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
}
func TestLoadRetentionInfo(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqPageSize, 100)
kvPath := path + "_kv_load"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_load"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_load"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
rmq.retentionInfo.ackedInfo.Delete(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
var wg sync.WaitGroup
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
wg.Add(1)
ll, ok := topicMu.Load(topicName)
assert.Equal(t, ok, true)
lock, _ := ll.(*sync.Mutex)
lock.Lock()
defer lock.Unlock()
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
initRetentionInfo(rmq.retentionInfo.kv, rmq.store)
dummyTopic := strings.Repeat(topicName, 100)
err = DeleteMessages(rmq.store, dummyTopic, 0, 0)
assert.Error(t, err)
err = DeleteMessages(rmq.store, topicName, 0, 0)
assert.NoError(t, err)
}
func TestComplexRmqRetention(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 1)
atomic.StoreInt64(&RocksmqPageSize, 10)
kvPath := path + "_kv_com"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_com"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_kv_com"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10
time.Sleep(time.Duration(checkTimeInterval*2) * time.Second)
// Seek to a previous consumed message, the message should be clean up
log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID))
err = rmq.Seek(topicName, groupName, cMsgs[10].MsgID)
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID)
}
func TestRmqRetentionPageTimeExpire(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
kvPath := path + "_kv_com1"
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := path + "_db_com1"
defer os.RemoveAll(rocksdbPath)
metaPath := path + "_meta_kv_com1"
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.stopRetention()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
assert.Nil(t, err)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := 7
time.Sleep(time.Duration(checkTimeInterval) * time.Second)
// Seek to a previous consumed message, the message should be clean up
log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID))
err = rmq.Seek(topicName, groupName, cMsgs[len(cMsgs)/2].MsgID)
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
// assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID)
}