Change some configurations, include change the defaultChannelNum to 16 (#23617)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/23760/head
smellthemoon 2023-04-27 14:26:35 +08:00 committed by GitHub
parent 016311ad48
commit 912cf4ef0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 47 additions and 11 deletions

View File

@ -115,14 +115,14 @@ rocksmq:
# please adjust in embedded Milvus: /tmp/milvus/rdb_data
path: /var/lib/milvus/rdb_data
lrucacheratio: 0.06 # rocksdb cache memory ratio
rocksmqPageSize: 268435456 # 256 MB, 256 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
retentionTimeInMinutes: 7200 # 5 days, 5 * 24 * 60 minutes, The retention time of the message in rocksmq.
rocksmqPageSize: 67108864 # 64 MB, 64 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
retentionTimeInMinutes: 4320 # 3 days, 3 * 24 * 60 minutes, The retention time of the message in rocksmq.
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
rootCoord:
dmlChannelNum: 256 # The number of dml channels created at system startup
dmlChannelNum: 16 # The number of dml channels created at system startup
maxPartitionNum: 4096 # Maximum number of partitions in a collection
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
importTaskExpiration: 900 # (in seconds) Duration after which an import task will expire (be killed). Default 900 seconds (15 minutes).

View File

@ -45,7 +45,7 @@ type UniqueID = typeutil.UniqueID
// RmqState Rocksmq state
type RmqState = int64
// RocksmqPageSize is the size of a message page, default 256MB
// RocksmqPageSize is the size of a message page, default 64MB
// RocksDB cache size limitation(TODO config it)
var RocksDBLRUCacheMinCapacity = uint64(1 << 29)

View File

@ -138,7 +138,7 @@ type dmlChannels struct {
capacity int64
// pool maintains channelName => dmlMsgStream mapping, stable
pool sync.Map
// mut protects channlsHeap only
// mut protects channelsHeap only
mut sync.Mutex
// channelsHeap is the heap to pop next dms for use
channelsHeap channelsHeap
@ -155,7 +155,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
// if topic created, use the existed topic
if params.PreCreatedTopicEnabled.GetAsBool() {
chanNamePrefix = ""
chanNum = int64(len(params.TopicNames.GetAsStrings()))
chanNum = int64(len(params.TopicNames.GetAsStrings())) + chanNumDefault
names = params.TopicNames.GetAsStrings()
} else {
chanNamePrefix = chanNamePrefixDefault

View File

@ -111,8 +111,12 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
}
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// if the old channels number used by the user is greater than the set default value currently
// keep the old channels
defaultChanNum := getNeedChanNum(Params.RootCoordCfg.DmlChannelNum.GetAsInt(), chanMap)
// initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml.GetValue(), Params.RootCoordCfg.DmlChannelNum.GetAsInt64())
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml.GetValue(), int64(defaultChanNum))
// recover physical channels for all collections
for collID, chanNames := range chanMap {

View File

@ -135,3 +135,16 @@ func getTravelTs(req TimeTravelRequest) Timestamp {
func isMaxTs(ts Timestamp) bool {
return ts == typeutil.MaxTimestamp
}
func getNeedChanNum(setNum int, chanMap map[typeutil.UniqueID][]string) int {
chanNames := typeutil.NewSet[string]()
for _, chanName := range chanMap {
chanNames.Insert(chanName...)
}
ret := chanNames.Len()
if setNum > chanNames.Len() {
ret = setNum
}
return ret
}

View File

@ -148,3 +148,22 @@ func Test_isMaxTs(t *testing.T) {
})
}
}
func Test_GetNeedChanNum(t *testing.T) {
chanMap := map[typeutil.UniqueID][]string{
int64(1): {"rootcoord-dml_101"},
int64(2): {"rootcoord-dml_102"},
int64(3): {"rootcoord-dml_103"},
}
num := getNeedChanNum(2, chanMap)
assert.Equal(t, num, 3)
chanMap = map[typeutil.UniqueID][]string{
int64(1): {"rootcoord-dml_101", "rootcoord-dml_102"},
int64(2): {"rootcoord-dml_102", "rootcoord-dml_101"},
int64(3): {"rootcoord-dml_103", "rootcoord-dml_102"},
}
num = getNeedChanNum(2, chanMap)
assert.Equal(t, num, 3)
}

View File

@ -779,7 +779,7 @@ func (p *rootCoordConfig) init(base *BaseTable) {
p.DmlChannelNum = ParamItem{
Key: "rootCoord.dmlChannelNum",
Version: "2.0.0",
DefaultValue: "256",
DefaultValue: "16",
Forbidden: true,
Doc: "The number of dml channels created at system startup",
Export: true,

View File

@ -585,16 +585,16 @@ please adjust in embedded Milvus: /tmp/milvus/rdb_data`,
Key: "rocksmq.rocksmqPageSize",
DefaultValue: strconv.FormatInt(64<<20, 10),
Version: "2.0.0",
Doc: "256 MB, 256 * 1024 * 1024 bytes, The size of each page of messages in rocksmq",
Doc: "64 MB, 64 * 1024 * 1024 bytes, The size of each page of messages in rocksmq",
Export: true,
}
r.PageSize.Init(base.mgr)
r.RetentionTimeInMinutes = ParamItem{
Key: "rocksmq.retentionTimeInMinutes",
DefaultValue: "7200",
DefaultValue: "4320",
Version: "2.0.0",
Doc: "5 days, 5 * 24 * 60 minutes, The retention time of the message in rocksmq.",
Doc: "3 days, 3 * 24 * 60 minutes, The retention time of the message in rocksmq.",
Export: true,
}
r.RetentionTimeInMinutes.Init(base.mgr)