mirror of https://github.com/milvus-io/milvus.git
Support trigger compaction everyday to eliminiate deleted data in rocksmq (#18752)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com> Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/18876/head
parent
d28114c28c
commit
8e22d03cf3
|
@ -111,8 +111,9 @@ rocksmq:
|
|||
# please adjust in embedded Milvus: /tmp/milvus/rdb_data
|
||||
path: /var/lib/milvus/rdb_data # The path where the message is stored in rocksmq
|
||||
rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
|
||||
retentionTimeInMinutes: 10080 # 7 days, 7 * 24 * 60 minutes, The retention time of the message in rocksmq.
|
||||
retentionTimeInMinutes: 7200 # 5 days, 5 * 24 * 60 minutes, The retention time of the message in rocksmq.
|
||||
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
|
||||
compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data
|
||||
lrucacheratio: 0.06 # rocksdb cache memory ratio
|
||||
|
||||
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
|
||||
|
|
|
@ -65,6 +65,6 @@ func (p *producer) Send(message *ProducerMessage) (UniqueID, error) {
|
|||
func (p *producer) Close() {
|
||||
err := p.c.server.DestroyTopic(p.topic)
|
||||
if err != nil {
|
||||
log.Debug("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err))
|
||||
log.Warn("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,26 +77,7 @@ func InitRocksMQ(path string) error {
|
|||
log.Warn("rocksmq.rocksmqPageSize is invalid, using default value 2G")
|
||||
}
|
||||
}
|
||||
rawRmqRetentionTimeInMinutes, err := params.Load("rocksmq.retentionTimeInMinutes")
|
||||
if err == nil && rawRmqRetentionTimeInMinutes != "" {
|
||||
rawRmqRetentionTimeInMinutes, err := strconv.ParseInt(rawRmqRetentionTimeInMinutes, 10, 64)
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60)
|
||||
} else {
|
||||
log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value")
|
||||
}
|
||||
}
|
||||
rawRmqRetentionSizeInMB, err := params.Load("rocksmq.retentionSizeInMB")
|
||||
if err == nil && rawRmqRetentionSizeInMB != "" {
|
||||
rawRmqRetentionSizeInMB, err := strconv.ParseInt(rawRmqRetentionSizeInMB, 10, 64)
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, rawRmqRetentionSizeInMB)
|
||||
} else {
|
||||
log.Warn("rocksmq.retentionSizeInMB is invalid, using default value 0")
|
||||
}
|
||||
}
|
||||
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", rawRmqRetentionTimeInMinutes),
|
||||
zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize))
|
||||
|
||||
Rmq, finalErr = NewRocksMQ(params, path, nil)
|
||||
})
|
||||
return finalErr
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/tecbot/gorocksdb"
|
||||
|
@ -114,19 +113,6 @@ func checkRetention() bool {
|
|||
return RocksmqRetentionTimeInSecs != -1 || RocksmqRetentionSizeInMB != -1
|
||||
}
|
||||
|
||||
func getNowTs(idAllocator allocator.GIDAllocator) (int64, error) {
|
||||
err := idAllocator.UpdateID()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
newID, err := idAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
nowTs, _ := tsoutil.ParseTS(uint64(newID))
|
||||
return nowTs.Unix(), err
|
||||
}
|
||||
|
||||
var topicMu = sync.Map{}
|
||||
|
||||
type rocksmq struct {
|
||||
|
@ -227,7 +213,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.
|
|||
readers: sync.Map{},
|
||||
}
|
||||
|
||||
ri, err := initRetentionInfo(kv, db)
|
||||
ri, err := initRetentionInfo(params, kv, db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
|
|||
params.Init()
|
||||
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
||||
assert.NoError(t, err)
|
||||
defer rmq.stopRetention()
|
||||
defer rmq.Close()
|
||||
|
||||
topicName := "topic_register"
|
||||
groupName := "group_register"
|
||||
|
@ -231,7 +231,6 @@ func TestRocksmq_Dummy(t *testing.T) {
|
|||
err = rmq.Seek(channelName1, groupName1, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
rmq.stopRetention()
|
||||
channelName2 := strings.Repeat(channelName1, 100)
|
||||
err = rmq.CreateTopic(string(channelName2))
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -21,16 +21,23 @@ import (
|
|||
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RocksmqRetentionTimeInMinutes is the time of retention
|
||||
var RocksmqRetentionTimeInSecs int64 = 10080 * 60
|
||||
var RocksmqRetentionTimeInSecs int64
|
||||
var DefaultRocksmqRetentionTimeInMins int64 = 7200
|
||||
|
||||
// RocksmqRetentionSizeInMB is the size of retention
|
||||
var RocksmqRetentionSizeInMB int64 = 8192
|
||||
var RocksmqRetentionSizeInMB int64
|
||||
var DefaultRocksmqRetentionSizeInMB int64 = 8192
|
||||
|
||||
// RocksmqRetentionCompactionInterval is the Interval we trigger compaction,
|
||||
var RocksmqRetentionCompactionInterval int64
|
||||
var DefaultRocksmqRetentionCompactionInterval int64 = 86400
|
||||
|
||||
// Const value that used to convert unit
|
||||
const (
|
||||
|
@ -38,7 +45,7 @@ const (
|
|||
)
|
||||
|
||||
// TickerTimeInSeconds is the time of expired check, default 10 minutes
|
||||
var TickerTimeInSeconds int64 = 60
|
||||
var TickerTimeInSeconds int64 = 600
|
||||
|
||||
type retentionInfo struct {
|
||||
// key is topic name, value is last retention time
|
||||
|
@ -53,7 +60,11 @@ type retentionInfo struct {
|
|||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
|
||||
func initRetentionInfo(params paramtable.BaseTable, kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
|
||||
rawRmqRetentionTimeInMinutes := params.ParseInt64WithDefault("rocksmq.retentionTimeInMinutes", DefaultRocksmqRetentionTimeInMins)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60)
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64WithDefault("rocksmq.retentionSizeInMB", DefaultRocksmqRetentionSizeInMB))
|
||||
atomic.StoreInt64(&RocksmqRetentionCompactionInterval, params.ParseInt64WithDefault("rocksmq.compactionInterval", DefaultRocksmqRetentionCompactionInterval))
|
||||
ri := &retentionInfo{
|
||||
topicRetetionTime: sync.Map{},
|
||||
mutex: sync.RWMutex{},
|
||||
|
@ -86,15 +97,22 @@ func (ri *retentionInfo) startRetentionInfo() {
|
|||
// retention do time ticker and trigger retention check and operation for each topic
|
||||
func (ri *retentionInfo) retention() error {
|
||||
log.Debug("Rocksmq retention goroutine start!")
|
||||
// Do retention check every 6s
|
||||
// Do retention check every 10 mins
|
||||
ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&TickerTimeInSeconds) * int64(time.Second)))
|
||||
defer ticker.Stop()
|
||||
compactionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&RocksmqRetentionCompactionInterval) * int64(time.Second)))
|
||||
defer compactionTicker.Stop()
|
||||
defer ri.closeWg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ri.closeCh:
|
||||
log.Debug("Rocksmq retention finish!")
|
||||
log.Warn("Rocksmq retention finish!")
|
||||
return nil
|
||||
case <-compactionTicker.C:
|
||||
log.Info("trigger rocksdb compaction, should trigger rocksdb data clean")
|
||||
go ri.db.CompactRange(gorocksdb.Range{Start: nil, Limit: nil})
|
||||
go ri.kv.DB.CompactRange(gorocksdb.Range{Start: nil, Limit: nil})
|
||||
case t := <-ticker.C:
|
||||
timeNow := t.Unix()
|
||||
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInSecs) / 10
|
||||
|
@ -354,7 +372,6 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("Delete message for topic", zap.String("topic", topic), zap.Int64("startID", startID), zap.Int64("endID", endID))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -39,11 +39,6 @@ func TestRmqRetention_Basic(t *testing.T) {
|
|||
return
|
||||
}
|
||||
defer os.RemoveAll(retentionPath)
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 2)
|
||||
|
||||
rocksdbPath := retentionPath
|
||||
defer os.RemoveAll(rocksdbPath)
|
||||
metaPath := retentionPath + metaPathSuffix
|
||||
|
@ -52,9 +47,12 @@ func TestRmqRetention_Basic(t *testing.T) {
|
|||
var params paramtable.BaseTable
|
||||
params.Init()
|
||||
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
|
||||
defer rmq.Close()
|
||||
assert.Nil(t, err)
|
||||
defer rmq.stopRetention()
|
||||
defer rmq.Close()
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 2)
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
|
@ -133,10 +131,6 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
|
|||
return
|
||||
}
|
||||
defer os.RemoveAll(retentionPath)
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 2)
|
||||
|
||||
rocksdbPath := retentionPath
|
||||
defer os.RemoveAll(rocksdbPath)
|
||||
|
@ -146,9 +140,13 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
|
|||
var params paramtable.BaseTable
|
||||
params.Init()
|
||||
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
|
||||
defer rmq.Close()
|
||||
assert.Nil(t, err)
|
||||
defer rmq.stopRetention()
|
||||
defer rmq.Close()
|
||||
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 2)
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
|
@ -238,12 +236,6 @@ func TestRmqRetention_MultipleTopic(t *testing.T) {
|
|||
return
|
||||
}
|
||||
defer os.RemoveAll(retentionPath)
|
||||
// no retention by size
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
|
||||
// retention by secs
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
kvPath := retentionPath + "kv_multi_topic"
|
||||
os.RemoveAll(kvPath)
|
||||
idAllocator := InitIDAllocator(kvPath)
|
||||
|
@ -258,6 +250,13 @@ func TestRmqRetention_MultipleTopic(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
defer rmq.Close()
|
||||
|
||||
// no retention by size
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
|
||||
// retention by secs
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
assert.Nil(t, err)
|
||||
|
@ -456,12 +455,7 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
|
|||
return
|
||||
}
|
||||
defer os.RemoveAll(retentionPath)
|
||||
// no retention by size
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
|
||||
// retention by secs
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
|
||||
kvPath := retentionPath + "kv_com1"
|
||||
os.RemoveAll(kvPath)
|
||||
idAllocator := InitIDAllocator(kvPath)
|
||||
|
@ -477,6 +471,13 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
defer rmq.Close()
|
||||
|
||||
// no retention by size
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1)
|
||||
// retention by secs
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
assert.Nil(t, err)
|
||||
|
@ -579,10 +580,6 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
|
|||
return
|
||||
}
|
||||
defer os.RemoveAll(retentionPath)
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
kvPath := retentionPath + "kv_com2"
|
||||
os.RemoveAll(kvPath)
|
||||
idAllocator := InitIDAllocator(kvPath)
|
||||
|
@ -598,6 +595,12 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
defer rmq.Close()
|
||||
|
||||
// update some configrocksmq_retentions to make cleanup trigger faster
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 1)
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
assert.Nil(t, err)
|
||||
|
|
Loading…
Reference in New Issue