Fix RocksMQ UT (#13819)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/13948/head
Xiaofan 2021-12-22 11:39:03 +08:00 committed by GitHub
parent f6e23458aa
commit 02d1282949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 216 additions and 53 deletions

View File

@ -98,7 +98,9 @@ func (kv *RocksdbKV) Load(key string) (string, error) {
if key == "" {
return "", errors.New("rocksdb kv does not support load empty key")
}
value, err := kv.DB.Get(kv.ReadOptions, []byte(key))
option := gorocksdb.NewDefaultReadOptions()
defer option.Destroy()
value, err := kv.DB.Get(option, []byte(key))
if err != nil {
return "", err
}
@ -112,16 +114,14 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
if kv.DB == nil {
return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix)
}
kv.ReadOptions.SetPrefixSameAsStart(true)
if prefix != "" {
kv.ReadOptions.SetIterateUpperBound([]byte(typeutil.AddOne(prefix)))
}
iter := kv.DB.NewIterator(kv.ReadOptions)
option := gorocksdb.NewDefaultReadOptions()
defer option.Destroy()
iter := kv.DB.NewIterator(option)
defer iter.Close()
keys := make([]string, 0)
values := make([]string, 0)
iter.Seek([]byte(prefix))
for ; iter.Valid(); iter.Next() {
for ; iter.ValidForPrefix([]byte(prefix)); iter.Next() {
key := iter.Key()
value := iter.Value()
keys = append(keys, string(key.Data()))
@ -141,8 +141,10 @@ func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) {
return nil, errors.New("rocksdb instance is nil when do MultiLoad")
}
values := make([]string, 0, len(keys))
option := gorocksdb.NewDefaultReadOptions()
defer option.Destroy()
for _, key := range keys {
value, err := kv.DB.Get(kv.ReadOptions, []byte(key))
value, err := kv.DB.Get(option, []byte(key))
if err != nil {
return []string{}, err
}

View File

@ -513,7 +513,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
log.Error("RocksMQ: alloc id failed.", zap.Error(err))
return []UniqueID{}, err
}
allocTime := time.Since(start).Milliseconds()
if UniqueID(msgLen) != idEnd-idStart {
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
@ -538,7 +538,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
log.Debug("RocksMQ: write batch failed")
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
@ -565,12 +565,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
if err != nil {
return []UniqueID{}, err
}
updatePageTime := time.Since(start).Milliseconds()
// TODO add this to monitor metrics
getProduceTime := time.Since(start).Milliseconds()
if getLockTime > 200 || getProduceTime > 200 {
if getProduceTime > 200 {
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
zap.Int64("get lock elapse", getLockTime), zap.Int64("produce elapse", getProduceTime))
zap.Int64("get lock elapse", getLockTime),
zap.Int64("alloc elapse", allocTime),
zap.Int64("write elapse", writeTime),
zap.Int64("updatePage elapse", updatePageTime))
}
return msgIDs, nil
}
@ -639,14 +643,13 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
readOpts.SetIterateUpperBound([]byte(typeutil.AddOne(topicName + "/")))
prefix := topicName + "/"
iter := rmq.store.NewIterator(readOpts)
defer iter.Close()
var dataKey string
if currentID == DefaultMessageID {
dataKey = topicName + "/"
dataKey = prefix
} else {
dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10))
}
@ -654,7 +657,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
consumerMessage := make([]ConsumerMessage, 0, n)
offset := 0
for ; iter.Valid() && offset < n; iter.Next() {
for ; iter.ValidForPrefix([]byte(prefix)) && offset < n; iter.Next() {
key := iter.Key()
val := iter.Value()
strKey := string(key.Data())
@ -697,7 +700,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
}
newID := consumedIDs[len(consumedIDs)-1]
rmq.moveConsumePos(topicName, groupName, newID+1)
rmq.updateAckedInfo(topicName, groupName, consumedIDs)
// TODO add this to monitor metrics
getConsumeTime := time.Since(start).Milliseconds()
@ -784,8 +786,8 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
iter := rmq.store.NewIterator(readOpts)
defer iter.Close()
// 0 is the ASC value of "/" + 1
iter.SeekForPrev([]byte(topicName + "0"))
// seek to the last message of thie topic
iter.SeekForPrev([]byte(typeutil.AddOne(topicName + "/")))
// if iterate fail
if err := iter.Err(); err != nil {
@ -801,7 +803,9 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
iKey := iter.Key()
seekMsgID := string(iKey.Data())
iKey.Free()
if iKey != nil {
iKey.Free()
}
// if find message is not belong to current channel, start from 0
if !strings.Contains(seekMsgID, fixTopicName) {
return nil
@ -841,19 +845,16 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
lastID := ids[len(ids)-1]
// 1. Try to get the page id between first ID and last ID of ids
pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName)
pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName) + "/"
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
pageMsgFirstKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10)
// set last key by lastID
pageMsgLastKey := pageMsgPrefix + "/" + strconv.FormatInt(lastID+1, 10)
pageMsgFirstKey := pageMsgPrefix + strconv.FormatInt(firstID, 10)
readOpts.SetIterateUpperBound([]byte(pageMsgLastKey))
iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts)
defer iter.Close()
var pageIDs []UniqueID
for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() {
for iter.Seek([]byte(pageMsgFirstKey)); iter.ValidForPrefix([]byte(pageMsgPrefix)); iter.Next() {
key := iter.Key()
pageID, err := parsePageID(string(key.Data()))
if key != nil {
@ -943,6 +944,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI
reader := &rocksmqReader{
store: rmq.store,
topic: topicName,
prefix: []byte(topicName + "/"),
readerName: readerName,
readOpts: readOpts,
iter: iter,

View File

@ -25,6 +25,7 @@ import (
type rocksmqReader struct {
store *gorocksdb.DB
topic string
prefix []byte
readerName string
readOpts *gorocksdb.ReadOptions
@ -38,8 +39,7 @@ type rocksmqReader struct {
//Seek seek the rocksmq reader to the pointed position
func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet
rr.currentID = msgID
fixTopicName := rr.topic + "/"
dataKey := path.Join(fixTopicName, strconv.FormatInt(msgID, 10))
dataKey := path.Join(rr.topic, strconv.FormatInt(msgID, 10))
rr.iter.Seek([]byte(dataKey))
if !rr.messageIDInclusive {
rr.currentID++
@ -56,6 +56,10 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
key := iter.Key()
val := iter.Value()
tmpKey := string(key.Data())
if key != nil {
key.Free()
}
var msgID UniqueID
msgID, err = strconv.ParseInt(tmpKey[len(rr.topic)+1:], 10, 64)
msg = &ConsumerMessage{
@ -67,15 +71,17 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
msg.Payload = make([]byte, dataLen)
copy(msg.Payload, origData)
}
val.Free()
if val != nil {
val.Free()
}
iter.Next()
rr.currentID = msgID
}
if iter.Valid() {
if iter.ValidForPrefix(rr.prefix) {
getMsg()
return msg, err
}
// TODO this is the same logic as pulsar reader, but do we really need to read till the end of the stream
select {
case <-ctx.Done():
log.Debug("Stop get next reader message!")
@ -87,11 +93,10 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
}
rr.iter.Close()
rr.iter = rr.store.NewIterator(rr.readOpts)
fixTopicName := rr.topic + "/"
dataKey := path.Join(fixTopicName, strconv.FormatInt(rr.currentID+1, 10))
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
iter = rr.iter
iter.Seek([]byte(dataKey))
if !iter.Valid() {
if !iter.ValidForPrefix(rr.prefix) {
return nil, errors.New("reader iterater is still invalid after receive mutex")
}
getMsg()
@ -100,7 +105,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
}
func (rr *rocksmqReader) HasNext() bool {
if rr.iter.Valid() {
if rr.iter.ValidForPrefix(rr.prefix) {
return true
}
@ -111,10 +116,9 @@ func (rr *rocksmqReader) HasNext() bool {
}
rr.iter.Close()
rr.iter = rr.store.NewIterator(rr.readOpts)
fixTopicName := rr.topic + "/"
dataKey := path.Join(fixTopicName, strconv.FormatInt(rr.currentID+1, 10))
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
rr.iter.Seek([]byte(dataKey))
return rr.iter.Valid()
return rr.iter.ValidForPrefix(rr.prefix)
default:
return false
}

View File

@ -21,8 +21,6 @@ import (
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
)
@ -136,6 +134,7 @@ func (ri *retentionInfo) Stop() {
// 4. delete message by range of page id;
func (ri *retentionInfo) expiredCleanUp(topic string) error {
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
start := time.Now()
var deletedAckedSize int64
var pageCleaned UniqueID
var pageEndID UniqueID
@ -154,14 +153,12 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
pageReadOpts := gorocksdb.NewDefaultReadOptions()
defer pageReadOpts.Destroy()
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic)
// ensure the iterator won't iterate to other topics
pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix)))
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
defer pageIter.Close()
pageIter.Seek([]byte(pageMsgPrefix))
for ; pageIter.Valid(); pageIter.Next() {
for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() {
pKey := pageIter.Key()
pageID, err := parsePageID(string(pKey.Data()))
if pKey != nil {
@ -240,8 +237,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic))
return nil
}
expireTime := time.Since(start).Milliseconds()
log.Debug("Expired check by message size: ", zap.Any("topic", topic),
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned))
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize),
zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", expireTime))
return ri.cleanData(topic, pageEndID)
}
@ -250,14 +249,13 @@ func (ri *retentionInfo) calculateTopicAckedSize(topic string) (int64, error) {
pageReadOpts := gorocksdb.NewDefaultReadOptions()
defer pageReadOpts.Destroy()
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic)
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
// ensure the iterator won't iterate to other topics
pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix)))
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
defer pageIter.Close()
pageIter.Seek([]byte(pageMsgPrefix))
var ackedSize int64
for ; pageIter.Valid(); pageIter.Next() {
for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() {
key := pageIter.Key()
pageID, err := parsePageID(string(key.Data()))
if key != nil {

View File

@ -228,6 +228,163 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
// Test multiple topic
func TestRmqRetention_MultipleTopic(t *testing.T) {
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
defer os.RemoveAll(retentionPath)
// no retention by size
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
// retention by secs
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
kvPath := retentionPath + "kv_multi_topic"
os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := retentionPath + "db_multi_topic"
os.RemoveAll(rocksdbPath)
metaPath := retentionPath + "meta_multi_topic"
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
topicName := "topic_a"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
msgNum := 100
pMsgs := make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids1, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids1))
topicName = "topic_b"
err = rmq.CreateTopic(topicName)
assert.Nil(t, err)
defer rmq.DestroyTopic(topicName)
pMsgs = make([]ProducerMessage, msgNum)
for i := 0; i < msgNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids2, err := rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(pMsgs), len(ids2))
topicName = "topic_a"
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
assert.NoError(t, err)
consumer := &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
cMsgs := make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
assert.Equal(t, cMsgs[0].MsgID, ids1[0])
time.Sleep(time.Duration(3) * time.Second)
err = rmq.Seek(topicName, groupName, ids1[10])
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
// test acked size acked ts and other meta are updated as expect
msgSizeKey := MessageSizeTitle + topicName
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
assert.NoError(t, err)
assert.Equal(t, msgSizeVal, "0")
// 100 page left, each entity is a page
pageMsgSizeKey := constructKey(PageMsgSizeTitle, "topic_a")
keys, values, err := rmq.kv.LoadWithPrefix(pageMsgSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
pageTsSizeKey := constructKey(PageTsTitle, "topic_a")
keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
aclTsSizeKey := constructKey(AckedTsTitle, "topic_a")
keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
// for topic B, nothing has been cleadn
pageMsgSizeKey = constructKey(PageMsgSizeTitle, "topic_b")
keys, values, err = rmq.kv.LoadWithPrefix(pageMsgSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 50)
assert.Equal(t, len(values), 50)
pageTsSizeKey = constructKey(PageTsTitle, "topic_b")
keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 50)
assert.Equal(t, len(values), 50)
aclTsSizeKey = constructKey(AckedTsTitle, "topic_b")
keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey)
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
assert.Equal(t, len(values), 0)
topicName = "topic_b"
_ = rmq.DestroyConsumerGroup(topicName, groupName)
err = rmq.CreateConsumerGroup(topicName, groupName)
assert.NoError(t, err)
consumer = &Consumer{
Topic: topicName,
GroupName: groupName,
}
rmq.RegisterConsumer(consumer)
cMsgs = make([]ConsumerMessage, 0)
for i := 0; i < msgNum; i++ {
cMsg, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
assert.Equal(t, cMsgs[0].MsgID, ids2[0])
time.Sleep(time.Duration(3) * time.Second)
err = rmq.Seek(topicName, groupName, ids2[10])
assert.Nil(t, err)
newRes, err = rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
}
@ -417,7 +574,7 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1)
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1)
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
atomic.StoreInt64(&TickerTimeInSeconds, 1)
kvPath := retentionPath + "kv_com2"
os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
@ -466,14 +623,14 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
cMsgs = append(cMsgs, cMsg[0])
}
assert.Equal(t, len(cMsgs), msgNum)
time.Sleep(time.Duration(3) * time.Second)
log.Debug("Already consumed, wait for message cleaned by retention")
// wait for enough time for page expiration
time.Sleep(time.Duration(2) * time.Second)
err = rmq.Seek(topicName, groupName, ids[0])
assert.Nil(t, err)
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 1)
// make sure clean up happens
// TODO(yukun): Sometimes failed
// assert.True(t, newRes[0].MsgID > ids[0])
assert.True(t, newRes[0].MsgID > ids[0])
}