Support rocksmq dynamic lru cache size (#17626)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/17635/head
Xiaofan 2022-06-20 10:56:12 +08:00 committed by GitHub
parent b491673218
commit d70f483806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 39 deletions

View File

@ -91,6 +91,7 @@ rocksmq:
rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
retentionTimeInMinutes: 10080 # 7 days, 7 * 24 * 60 minutes, The retention time of the message in rocksmq.
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
lrucacheratio: 0.06 # rocksdb cache memory ratio
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
rootCoord:

View File

@ -17,9 +17,9 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
@ -47,7 +47,9 @@ func newMockClient() *client {
func newRocksMQ(t *testing.T, rmqPath string) server.RocksMQ {
rocksdbPath := rmqPath
rmq, err := server.NewRocksMQ(rocksdbPath, nil)
var params paramtable.BaseTable
params.Init()
rmq, err := server.NewRocksMQ(params, rocksdbPath, nil)
assert.NoError(t, err)
return rmq
}

View File

@ -42,7 +42,8 @@ var params paramtable.BaseTable
// InitRmq is deprecate implementation of global rocksmq. will be removed later
func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
var err error
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
params.Init()
Rmq, err = NewRocksMQ(params, rocksdbName, idAllocator)
return err
}
@ -96,7 +97,7 @@ func InitRocksMQ(path string) error {
}
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", rawRmqRetentionTimeInMinutes),
zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize))
Rmq, finalErr = NewRocksMQ(path, nil)
Rmq, finalErr = NewRocksMQ(params, path, nil)
})
return finalErr
}

View File

@ -27,6 +27,8 @@ import (
"github.com/milvus-io/milvus/internal/kv"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -44,13 +46,14 @@ type RmqState = int64
// RocksmqPageSize is the size of a message page, default 256MB
var RocksmqPageSize int64 = 256 << 20
// RocksDB cache size limitation(TODO config it)
var RocksDBLRUCacheMinCapacity = uint64(1 << 29)
var RocksDBLRUCacheMaxCapacity = uint64(4 << 30)
// Const variable that will be used in rocksmqs
const (
DefaultMessageID = -1
// TODO make it configable
RocksDBLRUCacheCapacity = 1 << 30
kvSuffix = "_meta_kv"
// topic_begin_id/topicName
@ -73,8 +76,6 @@ const (
// only in memory
CurrentIDSuffix = "current_id"
ReaderNamePrefix = "reader-"
RmqNotServingErrMsg = "Rocksmq is not serving"
)
@ -144,7 +145,7 @@ type rocksmq struct {
// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname
// 2. Init retention info, load retention info to memory
// 3. Start retention goroutine
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
// TODO we should use same rocksdb instance with different cfs
maxProcs := runtime.GOMAXPROCS(0)
parallelism := 1
@ -153,9 +154,24 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
} else if maxProcs > 8 {
parallelism = 2
}
log.Debug("Start rocksmq ", zap.Int("max proc", maxProcs), zap.Int("parallism", parallelism))
memoryCount := metricsinfo.GetMemoryCount()
// default rocks db cache is set with memory
rocksDBLRUCacheCapacity := RocksDBLRUCacheMinCapacity
if memoryCount > 0 {
ratio := params.ParseFloatWithDefault("rocksmq.lrucacheratio", 0.06)
calculatedCapacity := uint64(float64(memoryCount) * ratio)
if calculatedCapacity < RocksDBLRUCacheMinCapacity {
rocksDBLRUCacheCapacity = RocksDBLRUCacheMinCapacity
} else if calculatedCapacity > RocksDBLRUCacheMaxCapacity {
rocksDBLRUCacheCapacity = RocksDBLRUCacheMaxCapacity
} else {
rocksDBLRUCacheCapacity = calculatedCapacity
}
}
log.Debug("Start rocksmq ", zap.Int("max proc", maxProcs),
zap.Int("parallism", parallelism), zap.Uint64("lru cache", rocksDBLRUCacheCapacity))
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))
bbto.SetBlockCache(gorocksdb.NewLRUCache(rocksDBLRUCacheCapacity))
optsKV := gorocksdb.NewDefaultOptions()
optsKV.SetBlockBasedTableFactory(bbto)
optsKV.SetCreateIfMissing(true)

View File

@ -73,7 +73,9 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.NoError(t, err)
defer rmq.stopRetention()
@ -137,7 +139,9 @@ func TestRocksmq_Basic(t *testing.T) {
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -192,12 +196,13 @@ func TestRocksmq_Dummy(t *testing.T) {
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
_, err = NewRocksMQ("", idAllocator)
_, err = NewRocksMQ(params, "", idAllocator)
assert.Error(t, err)
channelName := "channel_a"
@ -263,11 +268,13 @@ func TestRocksmq_Seek(t *testing.T) {
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
_, err = NewRocksMQ("", idAllocator)
_, err = NewRocksMQ(params, "", idAllocator)
assert.Error(t, err)
defer os.RemoveAll("_meta_kv")
@ -329,7 +336,9 @@ func TestRocksmq_Loop(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -399,7 +408,9 @@ func TestRocksmq_Goroutines(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -474,7 +485,9 @@ func TestRocksmq_Throughout(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -532,7 +545,9 @@ func TestRocksmq_MultiChan(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -584,7 +599,9 @@ func TestRocksmq_CopyData(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -650,7 +667,9 @@ func TestRocksmq_SeekToLatest(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -741,7 +760,9 @@ func TestRocksmq_GetLatestMsg(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
channelName := newChanName()
@ -815,7 +836,9 @@ func TestRocksmq_Close(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -846,7 +869,9 @@ func TestRocksmq_SeekWithNoConsumerError(t *testing.T) {
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()

View File

@ -19,7 +19,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
@ -49,7 +49,9 @@ func TestRmqRetention_Basic(t *testing.T) {
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, nil)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
defer rmq.Close()
assert.Nil(t, err)
defer rmq.stopRetention()
@ -141,7 +143,9 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, nil)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, nil)
defer rmq.Close()
assert.Nil(t, err)
defer rmq.stopRetention()
@ -248,8 +252,9 @@ func TestRmqRetention_MultipleTopic(t *testing.T) {
os.RemoveAll(rocksdbPath)
metaPath := retentionPath + "meta_multi_topic"
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -406,7 +411,9 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
defer os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
assert.NotNil(t, rmq)
@ -415,8 +422,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
assert.Nil(t, err)
rmq.Close()
rmq, err = NewRocksMQ(rocksdbPath, idAllocator)
rmq, err = NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
assert.NotNil(t, rmq)
@ -465,7 +471,9 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) {
metaPath := retentionPath + "meta_kv_com1"
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
@ -584,7 +592,9 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) {
metaPath := retentionPath + "meta_kv_com2"
os.RemoveAll(metaPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
var params paramtable.BaseTable
params.Init()
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
assert.Nil(t, err)
defer rmq.Close()