mirror of https://github.com/milvus-io/milvus.git
Add rocksmqPageSize to config (#11113)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/11389/head
parent
dc9e8566a0
commit
8ec040a35c
|
@ -43,9 +43,10 @@ pulsar:
|
|||
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar.
|
||||
|
||||
rocksmq:
|
||||
path: /var/lib/milvus/rdb_data # The path where the message is stored in rocksmq
|
||||
retentionTimeInMinutes: 4320 # The time the messages are saved in rocksmq
|
||||
retentionSizeInMB: 0 # The size of the message saved in rocksmq
|
||||
path: /var/lib/milvus/rdb_data
|
||||
rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024
|
||||
retentionTimeInMinutes: 4320 # 3 days, 3 * 24 * 60
|
||||
retentionSizeInMB: 0
|
||||
|
||||
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
|
||||
rootCoord:
|
||||
|
|
|
@ -13,6 +13,7 @@ package rocksmq
|
|||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -64,9 +65,35 @@ func InitRocksMQ() error {
|
|||
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, params.ParseInt64("rocksmq.retentionTimeInMinutes"))
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64("rocksmq.retentionSizeInMB"))
|
||||
log.Debug("Rocksmq retention: ", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB))
|
||||
rawRmqPageSize, err := params.Load("rocksmq.rocksmqPageSize")
|
||||
if err == nil && rawRmqPageSize != "" {
|
||||
rmqPageSize, err := strconv.ParseInt(rawRmqPageSize, 10, 64)
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqPageSize, rmqPageSize)
|
||||
} else {
|
||||
log.Warn("rocksmq.rocksmqPageSize is invalid, using default value 2G")
|
||||
}
|
||||
}
|
||||
rawRmqRetentionTimeInMinutes, err := params.Load("rocksmq.retentionTimeInMinutes")
|
||||
if err == nil && rawRmqRetentionTimeInMinutes != "" {
|
||||
rawRmqRetentionTimeInMinutes, err := strconv.ParseInt(rawRmqRetentionTimeInMinutes, 10, 64)
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, rawRmqRetentionTimeInMinutes)
|
||||
} else {
|
||||
log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value 3 days")
|
||||
}
|
||||
}
|
||||
rawRmqRetentionSizeInMB, err := params.Load("rocksmq.retentionSizeInMB")
|
||||
if err == nil && rawRmqRetentionSizeInMB != "" {
|
||||
rawRmqRetentionSizeInMB, err := strconv.ParseInt(rawRmqRetentionSizeInMB, 10, 64)
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, rawRmqRetentionSizeInMB)
|
||||
} else {
|
||||
log.Warn("rocksmq.retentionSizeInMB is invalid, using default value 0")
|
||||
}
|
||||
}
|
||||
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes),
|
||||
zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize))
|
||||
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -26,10 +26,10 @@ import (
|
|||
)
|
||||
|
||||
// RocksmqRetentionTimeInMinutes is the time of retention
|
||||
var RocksmqRetentionTimeInMinutes int64
|
||||
var RocksmqRetentionTimeInMinutes int64 = 4320
|
||||
|
||||
// RocksmqRetentionSizeInMB is the size of retention
|
||||
var RocksmqRetentionSizeInMB int64
|
||||
var RocksmqRetentionSizeInMB int64 = 0
|
||||
|
||||
// Const value that used to convert unit
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue