enhance: [10kcp] Skip creating partition rate limiters when not enable (#38062)

issue: https://github.com/milvus-io/milvus/issues/37630

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38133/head
yihao.dai 2024-11-28 10:45:46 +08:00 committed by GitHub
parent 635d161109
commit 0930430a68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 19 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -110,9 +111,9 @@ func (m *dataViewManager) Close() {
})
}
func (m *dataViewManager) update(view *DataView) {
func (m *dataViewManager) update(view *DataView, reason string) {
m.currentViews.Insert(view.CollectionID, view)
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version))
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version), zap.String("reason", reason))
}
func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
@ -127,7 +128,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
currentView, ok := m.currentViews.Get(collectionID)
if !ok {
// update due to data view is empty
m.update(newView)
m.update(newView, "init data view")
return
}
// no-op if the incoming version is less than the current version.
@ -141,7 +142,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
current, ok := currentView.Channels[channel]
if !ok {
// update due to channel info is empty
m.update(newView)
m.update(newView, "init channel info")
return
}
if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) ||
@ -150,24 +151,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
!funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) {
// update due to segments list changed
m.update(newView)
m.update(newView, "channel segments list changed")
return
}
if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) {
// update due to partition stats changed
m.update(newView)
m.update(newView, "partition stats changed")
return
}
// TODO: It might be too frequent.
if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() {
newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp())
curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp())
if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) {
// update due to channel cp advanced
m.update(newView)
m.update(newView, "channel cp advanced")
return
}
}
if !typeutil.MapEqual(newView.Segments, currentView.Segments) {
// update due to segments list changed
m.update(newView)
m.update(newView, "segment list changed")
}
}

View File

@ -1170,6 +1170,15 @@ func (q *QuotaCenter) calculateRates() error {
func (q *QuotaCenter) resetAllCurrentRates() error {
clusterLimiter := newParamLimiterFunc(internalpb.RateScope_Cluster, allOps)()
q.rateLimiter = rlinternal.NewRateLimiterTree(clusterLimiter)
enablePartitionRateLimit := false
for rt := range getRateTypes(internalpb.RateScope_Partition, allOps) {
r := quota.GetQuotaValue(internalpb.RateScope_Partition, rt, Params)
if Limit(r) != Inf {
enablePartitionRateLimit = true
}
}
initLimiters := func(sourceCollections map[int64]map[int64][]int64) {
for dbID, collections := range sourceCollections {
for collectionID, partitionIDs := range collections {
@ -1180,21 +1189,20 @@ func (q *QuotaCenter) resetAllCurrentRates() error {
}
return limitVal
}
q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps))
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal))
if !enablePartitionRateLimit {
continue
}
for _, partitionID := range partitionIDs {
q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal),
newParamLimiterFunc(internalpb.RateScope_Partition, allOps))
}
if len(partitionIDs) == 0 {
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal))
}
}
if len(collections) == 0 {
q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps))
}
}
}

View File

@ -3254,7 +3254,8 @@ type dataCoordConfig struct {
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`
// data view
DataViewUpdateInterval ParamItem `refreshable:"true"`
DataViewUpdateInterval ParamItem `refreshable:"true"`
CPIntervalToUpdateDataView ParamItem `refreshable:"true"`
}
func (p *dataCoordConfig) init(base *BaseTable) {
@ -4107,6 +4108,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Export: false,
}
p.DataViewUpdateInterval.Init(base.mgr)
p.CPIntervalToUpdateDataView = ParamItem{
Key: "dataCoord.dataView.cpInterval",
Version: "2.5.0",
Doc: "cpInterval is a time interval in seconds. If the time interval between the new channel checkpoint and the current channel checkpoint exceeds cpInterval, it will trigger a data view update.",
DefaultValue: "600",
PanicIfEmpty: false,
Export: false,
}
p.CPIntervalToUpdateDataView.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -507,6 +507,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))
assert.Equal(t, 10*time.Second, Params.DataViewUpdateInterval.GetAsDuration(time.Second))
assert.Equal(t, 600*time.Second, Params.CPIntervalToUpdateDataView.GetAsDuration(time.Second))
})
t.Run("test dataNodeConfig", func(t *testing.T) {