Remove acked infos in memory for retention (#8735)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/9571/merge
yukun 2021-10-09 17:24:55 +08:00 committed by GitHub
parent 302ca04d6c
commit b6ec1783e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 325 additions and 73 deletions

View File

@ -108,6 +108,10 @@ func constructKey(metaName, topic string) (string, error) {
return metaName + topic + string(nameBytes), nil
}
func checkRetention() bool {
return RocksmqRetentionTimeInMinutes != -1 && RocksmqRetentionSizeInMB != -1
}
var topicMu sync.Map = sync.Map{}
type rocksmq struct {
@ -161,8 +165,10 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
}
rmq.retentionInfo = ri
rmq.retentionInfo.startRetentionInfo()
log.Debug("Rocksmq start successfully ", zap.String("name", name))
if checkRetention() {
rmq.retentionInfo.startRetentionInfo()
}
return rmq, nil
}
@ -261,15 +267,17 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
if err != nil {
return nil
}
rmq.retentionInfo.mutex.Lock()
defer rmq.retentionInfo.mutex.Unlock()
rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName)
rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{
pageEndID: make([]UniqueID, 0),
pageMsgSize: map[UniqueID]int64{},
})
rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow)
rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{
ackedTs: map[UniqueID]int64{},
})
// rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{
// pageEndID: make([]UniqueID, 0),
// pageMsgSize: map[UniqueID]int64{},
// })
// rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow)
// rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{
// ackedTs: map[UniqueID]int64{},
// })
log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
@ -315,9 +323,15 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
}
topicMu.Delete(topicName)
rmq.retentionInfo.ackedInfo.Delete(topicName)
rmq.retentionInfo.lastRetentionTime.Delete(topicName)
rmq.retentionInfo.pageInfo.Delete(topicName)
for i, name := range rmq.retentionInfo.topics {
if topicName == name {
rmq.retentionInfo.topics = append(rmq.retentionInfo.topics[:i], rmq.retentionInfo.topics[i+1:]...)
break
}
}
// rmq.retentionInfo.ackedInfo.Delete(topicName)
// rmq.retentionInfo.lastRetentionTime.Delete(topicName)
// rmq.retentionInfo.pageInfo.Delete(topicName)
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
@ -527,11 +541,11 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
return err
}
if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok {
pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID)
pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize
rmq.retentionInfo.pageInfo.Store(topicName, pageInfo)
}
// if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok {
// pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID)
// pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize
// rmq.retentionInfo.pageInfo.Store(topicName, pageInfo)
// }
// Update message size to 0
err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10))
@ -779,11 +793,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
ackedInfo := info.(*topicAckedInfo)
ackedInfo.ackedTs[minBeginID] = ts
rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
}
// if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
// ackedInfo := info.(*topicAckedInfo)
// ackedInfo.ackedTs[minBeginID] = ts
// rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
// }
if minBeginID == newID {
// Means the begin_id of topic update to newID, so needs to update acked size
ackedSizeKey := AckedSizeTitle + topicName
@ -800,11 +814,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
ackedInfo := info.(*topicAckedInfo)
ackedInfo.ackedSize = ackedSize
rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
}
// if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
// ackedInfo := info.(*topicAckedInfo)
// ackedInfo.ackedSize = ackedSize
// rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
// }
}
}
return nil

View File

@ -33,8 +33,8 @@ var RocksmqRetentionTimeInMinutes int64
// RocksmqRetentionSizeInMB is the size of retention
var RocksmqRetentionSizeInMB int64
// TickerTimeInMinutes is the time of expired check
var TickerTimeInMinutes int64 = 1
// TickerTimeInSeconds is the time of expired check
var TickerTimeInSeconds int64 = 6
// Const value that used to convert unit
const (
@ -66,6 +66,8 @@ type retentionInfo struct {
// lastRetentionTime map[string]int64
lastRetentionTime sync.Map
mutex sync.RWMutex
kv *rocksdbkv.RocksdbKV
db *gorocksdb.DB
}
@ -105,6 +107,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
pageInfo: sync.Map{},
ackedInfo: sync.Map{},
lastRetentionTime: sync.Map{},
mutex: sync.RWMutex{},
kv: kv,
db: db,
}
@ -124,19 +127,16 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
// Before do retention, load retention info from rocksdb to retention info structure in goroutines.
// Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine.
func (ri *retentionInfo) startRetentionInfo() {
var wg sync.WaitGroup
err := ri.kv.ResetPrefixLength(FixedChannelNameLen)
if err != nil {
log.Warn("Start load retention info", zap.Error(err))
}
for _, topic := range ri.topics {
log.Debug("Start load retention info", zap.Any("topic", topic))
// Load all page infos
wg.Add(1)
go ri.loadRetentionInfo(topic, &wg)
}
wg.Wait()
log.Debug("Finish load retention info, start retention")
// var wg sync.WaitGroup
ri.kv.ResetPrefixLength(FixedChannelNameLen)
// for _, topic := range ri.topics {
// log.Debug("Start load retention info", zap.Any("topic", topic))
// Load all page infos
// wg.Add(1)
// go ri.loadRetentionInfo(topic, &wg)
// }
// wg.Wait()
// log.Debug("Finish load retention info, start retention")
go ri.retention()
}
@ -277,7 +277,7 @@ func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
func (ri *retentionInfo) retention() error {
log.Debug("Rocksmq retention goroutine start!")
// Do retention check every 6s
ticker := time.NewTicker(time.Duration(TickerTimeInMinutes * int64(time.Minute) / 10))
ticker := time.NewTicker(time.Duration(TickerTimeInSeconds * int64(time.Second)))
for {
select {
@ -286,21 +286,242 @@ func (ri *retentionInfo) retention() error {
return nil
case t := <-ticker.C:
timeNow := t.Unix()
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * 60 / 10
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10
log.Debug("In ticker: ", zap.Any("ticker", timeNow))
ri.lastRetentionTime.Range(func(k, v interface{}) bool {
if v.(int64)+checkTime < timeNow {
err := ri.expiredCleanUp(k.(string))
ri.mutex.RLock()
for _, topic := range ri.topics {
lastRetentionTsKey := LastRetTsTitle + topic
lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey)
if err != nil || lastRetentionTsVal == "" {
log.Warn("Can't get lastRetentionTs", zap.Any("lastRetentionTsKey", lastRetentionTsKey))
continue
}
lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64)
if err != nil {
log.Warn("Can't parse lastRetentionTsVal to int", zap.Any("lastRetentionTsKey", lastRetentionTsKey))
continue
}
if lastRetentionTs+checkTime < timeNow {
err := ri.newExpiredCleanUp(topic)
if err != nil {
log.Warn("Retention expired clean failed", zap.Any("error", err))
}
}
return true
})
}
ri.mutex.RUnlock()
// ri.lastRetentionTime.Range(func(k, v interface{}) bool {
// if v.(int64)+checkTime < timeNow {
// err := ri.newExpiredCleanUp(k.(string))
// if err != nil {
// log.Warn("Retention expired clean failed", zap.Any("error", err))
// }
// }
// return true
// })
}
}
}
func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
ll, ok := topicMu.Load(topic)
if !ok {
return fmt.Errorf("topic name = %s not exist", topic)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topic)
}
lock.Lock()
defer lock.Unlock()
var deletedAckedSize int64 = 0
var startID UniqueID
var endID UniqueID
var pageStartID UniqueID = 0
var err error
fixedAckedTsKey, _ := constructKey(AckedTsTitle, topic)
pageReadOpts := gorocksdb.NewDefaultReadOptions()
defer pageReadOpts.Destroy()
pageReadOpts.SetPrefixSameAsStart(true)
pageIter := ri.kv.DB.NewIterator(pageReadOpts)
defer pageIter.Close()
pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topic)
pageIter.Seek([]byte(pageMsgPrefix))
if pageIter.Valid() {
pageStartID, err = strconv.ParseInt(string(pageIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}
for ; pageIter.Valid(); pageIter.Next() {
pKey := pageIter.Key()
pageID, err := strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64)
if pKey != nil {
pKey.Free()
}
if err != nil {
return err
}
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
ackedTsVal, err := ri.kv.Load(ackedTsKey)
if err != nil {
return err
}
ackedTs, err := strconv.ParseInt(ackedTsVal, 10, 64)
if err != nil {
return err
}
if msgTimeExpiredCheck(ackedTs) {
endID = pageID
pValue := pageIter.Value()
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
if pValue != nil {
pValue.Free()
}
if err != nil {
return err
}
deletedAckedSize += size
} else {
break
}
}
}
pageEndID := endID
ackedReadOpts := gorocksdb.NewDefaultReadOptions()
defer ackedReadOpts.Destroy()
ackedReadOpts.SetPrefixSameAsStart(true)
ackedIter := ri.kv.DB.NewIterator(ackedReadOpts)
defer ackedIter.Close()
if err != nil {
return err
}
ackedIter.Seek([]byte(fixedAckedTsKey))
if !ackedIter.Valid() {
return nil
}
startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return err
}
if endID > startID {
newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10)
ackedIter.Seek([]byte(newPos))
}
for ; ackedIter.Valid(); ackedIter.Next() {
aKey := ackedIter.Key()
aValue := ackedIter.Value()
ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64)
if aValue != nil {
aValue.Free()
}
if err != nil {
if aKey != nil {
aKey.Free()
}
return err
}
if msgTimeExpiredCheck(ackedTs) {
endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64)
if aKey != nil {
aKey.Free()
}
if err != nil {
return err
}
} else {
if aKey != nil {
aKey.Free()
}
break
}
}
if endID == 0 {
log.Debug("All messaged are not time expired")
}
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
ackedSizeKey := AckedSizeTitle + topic
totalAckedSizeVal, err := ri.kv.Load(ackedSizeKey)
if err != nil {
return err
}
totalAckedSize, err := strconv.ParseInt(totalAckedSizeVal, 10, 64)
if err != nil {
return err
}
for ; pageIter.Valid(); pageIter.Next() {
pValue := pageIter.Value()
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
if pValue != nil {
pValue.Free()
}
if err != nil {
return err
}
curDeleteSize := deletedAckedSize + size
if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) {
pKey := pageIter.Key()
endID, err = strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64)
if pKey != nil {
pKey.Free()
}
if err != nil {
return err
}
pageEndID = endID
deletedAckedSize += size
} else {
break
}
}
if endID == 0 {
log.Debug("All messages are not expired")
return nil
}
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
pageStartIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageStartID, 10)
pageEndIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageEndID, 10)
if pageStartID == pageEndID {
writeBatch.Delete([]byte(pageStartIDKey))
} else if pageStartID < pageEndID {
writeBatch.DeleteRange([]byte(pageStartIDKey), []byte(pageEndIDKey))
}
ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID))
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID))
if startID > endID {
return nil
} else if startID == endID {
writeBatch.Delete([]byte(ackedStartIDKey))
} else {
writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
}
newAckedSize := totalAckedSize - deletedAckedSize
writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10)))
writeOpts := gorocksdb.NewDefaultWriteOptions()
defer writeOpts.Destroy()
ri.kv.DB.Write(writeOpts, writeBatch)
return DeleteMessages(ri.db, topic, startID, endID)
}
/*
// 1. Obtain pageAckedInfo and do time expired check, get the expired page scope;
// 2. Do iteration in the page after the last page in step 1 and get the last time expired message id;
// 3. Do size expired check in next page, and get the last size expired message id;
@ -494,6 +715,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
return DeleteMessages(ri.db, topic, startID, endID)
}
*/
// DeleteMessages in rocksdb by range of [startID, endID)
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {

View File

@ -52,6 +52,8 @@ func genRandonName() string {
func TestRmqRetention(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
atomic.StoreInt64(&TickerTimeInSeconds, 2)
defer atomic.StoreInt64(&TickerTimeInSeconds, 6)
kvPath := retentionPath + kvPathSuffix
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
@ -100,7 +102,7 @@ func TestRmqRetention(t *testing.T) {
}
assert.Equal(t, len(cMsgs), msgNum)
checkTimeInterval := 6
checkTimeInterval := 2
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up
err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID)
@ -108,6 +110,15 @@ func TestRmqRetention(t *testing.T) {
newRes, err := rmq.Consume(topicName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(newRes), 0)
//////////////////////////////////////////////////
lastRetTsKey := LastRetTsTitle + topicName
rmq.kv.Save(lastRetTsKey, "")
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
//////////////////////////////////////////////////
rmq.kv.Save(lastRetTsKey, "dummy")
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
}
func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
@ -293,35 +304,40 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
//////////////////////////////////////////////////
fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName)
pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy"
err = rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy")
assert.Nil(t, err)
rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy")
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
assert.Nil(t, err)
rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
topicMu.Store(topicName, &sync.Mutex{})
err = rmq.retentionInfo.expiredCleanUp(topicName)
assert.Nil(t, err)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
err = rmq.retentionInfo.expiredCleanUp(topicName)
// TopicName has been deleted in line 310
assert.NotNil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.ackedInfo.Delete(topicName)
err = rmq.retentionInfo.expiredCleanUp(topicName)
assert.Nil(t, err)
pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topicName)
pageMsgKey := pageMsgPrefix + "/dummy"
rmq.kv.Save(pageMsgKey, "0")
rmq.retentionInfo.newExpiredCleanUp(topicName)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.DB = nil
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
//////////////////////////////////////////////////
longTopic := strings.Repeat("dummy", 100)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(longTopic, &wg)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
topicMu.Store(topicName, topicName)
rmq.retentionInfo.newExpiredCleanUp(topicName)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
rmq.retentionInfo.newExpiredCleanUp(topicName)
//////////////////////////////////////////////////
rmq.retentionInfo.ackedInfo.Delete(topicName)
rmq.retentionInfo.newExpiredCleanUp(topicName)
}
func TestRmqRetention_Complex(t *testing.T) {
@ -394,13 +410,13 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
atomic.StoreInt64(&RocksmqPageSize, 10)
kvPath := retentionPath + "kv_com1"
defer os.RemoveAll(kvPath)
os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)
rocksdbPath := retentionPath + "db_com1"
defer os.RemoveAll(rocksdbPath)
os.RemoveAll(rocksdbPath)
metaPath := retentionPath + "meta_kv_com1"
defer os.RemoveAll(metaPath)
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
assert.Nil(t, err)