From 804c18df682c74269825d41603b564ecdb3324c5 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 26 Sep 2022 16:48:53 +0800 Subject: [PATCH] Update quota params (#19351) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- configs/milvus.yaml | 87 +++++----- internal/proxy/multi_rate_limiter.go | 2 +- internal/proxy/multi_rate_limiter_test.go | 12 +- internal/rootcoord/quota_center.go | 13 +- internal/rootcoord/quota_center_test.go | 19 +-- internal/rootcoord/root_coord.go | 2 +- internal/util/paramtable/quota_param.go | 162 +++++++++++++++++-- internal/util/paramtable/quota_param_test.go | 16 +- 8 files changed, 230 insertions(+), 83 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 5f0ea4c074..c7ce11ae4b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -378,61 +378,68 @@ common: # 3. DQL Queue length/latency protection; # If necessary, you can also manually force to deny RW requests. quotaAndLimits: - enable: false # `true` to enable quota and limits, `false` to disable. + enabled: false # `true` to enable quota and limits, `false` to disable. # quotaCenterCollectInterval is the time interval that quotaCenter # collects metrics from Query cluster and Data cluster. quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536) ddl: # ddl limit rates, default no limit. - #collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection - #partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition - #indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex - #flushRate: # requests per minute, default no limit, rate for flush - #compactionRate: # requests per minute, default no limit, rate for manualCompaction + enabled: false + collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection + partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition + indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex + flushRate: # requests per minute, default no limit, rate for flush + compactionRate: # requests per minute, default no limit, rate for manualCompaction # dml limit rates, default no limit. # The maximum rate will not be greater than `max`, # and the rate after handling back pressure will not be less than `min`. dml: + enabled: false insertRate: - #max: # MB/s, default no limit - #min: # MB/s, default 0 + max: # MB/s, default no limit + min: # MB/s, default 0 deleteRate: - #max: # MB/s, default no limit - #min: # MB/s, default 0 + max: # MB/s, default no limit + min: # MB/s, default 0 bulkLoadRate: # not support yet. TODO: limit bulkLoad rate - #max: # MB/s, default no limit - #min: # MB/s, default 0 + max: # MB/s, default no limit + min: # MB/s, default 0 # dql limit rates, default no limit. # The maximum rate will not be greater than `max`, # and the rate after handling back pressure will not be less than `min`. dql: + enabled: false searchRate: - #max: # vps, default no limit - #min: # vps, default 0 + max: # vps, default no limit + min: # vps, default 0 queryRate: - #max: # qps, default no limit - #min: # qps, default 0 + max: # qps, default no limit + min: # qps, default 0 # limitWriting decides whether dml requests are allowed. limitWriting: # forceDeny `false` means dml requests are allowed (except for some # specific conditions, such as memory of nodes to water marker), `true` means always reject all dml requests. forceDeny: false - # maxTimeTickDelay indicates the backpressure for DML Operations. - # DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay, - # if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected. - maxTimeTickDelay: 30 # in seconds - # When memory usage > memoryHighWaterLevel, all dml requests would be rejected; - # When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate; - # When memory usage < memoryLowWaterLevel, no action. - # memoryLowWaterLevel should be less than memoryHighWaterLevel. - dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes - dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes - queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes - queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes + ttProtection: + enabled: true + # maxTimeTickDelay indicates the backpressure for DML Operations. + # DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay, + # if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected. + maxTimeTickDelay: 30 # in seconds + memProtection: + enabled: true + # When memory usage > memoryHighWaterLevel, all dml requests would be rejected; + # When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate; + # When memory usage < memoryLowWaterLevel, no action. + # memoryLowWaterLevel should be less than memoryHighWaterLevel. + dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes + dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes + queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes + queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes # limitReading decides whether dql requests are allowed. limitReading: @@ -440,16 +447,18 @@ quotaAndLimits: # specific conditions, such as collection has been dropped), `true` means always reject all dql requests. forceDeny: false - # NQInQueueThreshold indicated that the system was under backpressure for Search/Query path. - # If NQ in any QueryNode's queue is greater than NQInQueueThreshold, search&query rates would gradually cool off - # until the NQ in queue no longer exceeds NQInQueueThreshold. We think of the NQ of query request as 1. - #NQInQueueThreshold: # int, default no limit + queueProtection: + enabled: false + # nqInQueueThreshold indicated that the system was under backpressure for Search/Query path. + # If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off + # until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1. + nqInQueueThreshold: # int, default no limit - # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. - # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off - # until the latency of queuing no longer exceeds queueLatencyThreshold. - # The latency here refers to the averaged latency over a period of time. - #queueLatencyThreshold: # milliseconds, default no limit + # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. + # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off + # until the latency of queuing no longer exceeds queueLatencyThreshold. + # The latency here refers to the averaged latency over a period of time. + queueLatencyThreshold: # milliseconds, default no limit - # coolOffSpeed is the speed of search&query rates cool off. - #coolOffSpeed: 0.9 # (0, 1] + # coolOffSpeed is the speed of search&query rates cool off. + coolOffSpeed: 0.9 # (0, 1] diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 5b36d13338..49986c258d 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -45,7 +45,7 @@ func NewMultiRateLimiter() *MultiRateLimiter { // Limit returns true, the request will be rejected. // Otherwise, the request will pass. Limit also returns limit of limiter. func (m *MultiRateLimiter) Limit(rt internalpb.RateType, n int) (bool, float64) { - if !Params.QuotaConfig.EnableQuotaAndLimits { + if !Params.QuotaConfig.QuotaAndLimitsEnabled { return false, 1 // no limit } // TODO: call other rate limiters diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index 7151b4365f..cdd0c88c6c 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -28,8 +28,8 @@ import ( func TestMultiRateLimiter(t *testing.T) { Params.Init() t.Run("test multiRateLimiter", func(t *testing.T) { - bak := Params.QuotaConfig.EnableQuotaAndLimits - Params.QuotaConfig.EnableQuotaAndLimits = true + bak := Params.QuotaConfig.QuotaAndLimitsEnabled + Params.QuotaConfig.QuotaAndLimitsEnabled = true multiLimiter := NewMultiRateLimiter() for _, rt := range internalpb.RateType_value { multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) @@ -42,17 +42,17 @@ func TestMultiRateLimiter(t *testing.T) { ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt) assert.True(t, ok) } - Params.QuotaConfig.EnableQuotaAndLimits = bak + Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) t.Run("not enable quotaAndLimit", func(t *testing.T) { multiLimiter := NewMultiRateLimiter() - bak := Params.QuotaConfig.EnableQuotaAndLimits - Params.QuotaConfig.EnableQuotaAndLimits = false + bak := Params.QuotaConfig.QuotaAndLimitsEnabled + Params.QuotaConfig.QuotaAndLimitsEnabled = false ok, r := multiLimiter.Limit(internalpb.RateType(0), 1) assert.False(t, ok) assert.NotEqual(t, float64(0), r) - Params.QuotaConfig.EnableQuotaAndLimits = bak + Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index a3aad03430..4f02d49270 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -277,8 +277,11 @@ func (q *QuotaCenter) calculateReadRates() { q.forceDenyReading(ManualForceDeny) return } - coolOffSpeed := Params.QuotaConfig.CoolOffSpeed + if !Params.QuotaConfig.QueueProtectionEnabled { + return + } + coolOffSpeed := Params.QuotaConfig.CoolOffSpeed coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) { if q.currentRates[internalpb.RateType_DQLSearch] != Inf { q.currentRates[internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed) @@ -384,6 +387,10 @@ func (q *QuotaCenter) resetCurrentRates() { // timeTickDelay gets time tick delay of DataNodes and QueryNodes, // and return the factor according to max tolerable time tick delay. func (q *QuotaCenter) timeTickDelay() (float64, error) { + if !Params.QuotaConfig.TtProtectionEnabled { + return 1, nil + } + maxTt := Params.QuotaConfig.MaxTimeTickDelay if maxTt < 0 { // < 0 means disable tt protection @@ -465,6 +472,10 @@ func (q *QuotaCenter) checkQueryLatency() float64 { // and return the factor according to max memory water level. func (q *QuotaCenter) memoryToWaterLevel() float64 { factor := float64(1) + if !Params.QuotaConfig.MemProtectionEnabled { + return 1 + } + dataNodeMemoryLowWaterLevel := Params.QuotaConfig.DataNodeMemoryLowWaterLevel dataNodeMemoryHighWaterLevel := Params.QuotaConfig.DataNodeMemoryHighWaterLevel queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index b675ee9c49..a9a45bf6e9 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -144,8 +144,9 @@ func TestQuotaCenter(t *testing.T) { now := time.Now() - bak := Params.QuotaConfig.MaxTimeTickDelay + Params.QuotaConfig.TtProtectionEnabled = true Params.QuotaConfig.MaxTimeTickDelay = 3 * time.Second + // test force deny writing alloc := newMockTsoAllocator() alloc.GenerateTSOF = func(count uint32) (typeutil.Timestamp, error) { @@ -188,7 +189,6 @@ func TestQuotaCenter(t *testing.T) { } _, err = quotaCenter.timeTickDelay() assert.Error(t, err) - Params.QuotaConfig.MaxTimeTickDelay = bak }) t.Run("test checkNQInQuery", func(t *testing.T) { @@ -197,7 +197,7 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, float64(1), factor) // test cool off - bak := Params.QuotaConfig.NQInQueueThreshold + Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.NQInQueueThreshold = 100 quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ SearchQueue: metricsinfo.ReadInfoInQueue{ @@ -217,8 +217,6 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, 1.0, factor) //ok := math.Abs(factor-1.0) < 0.0001 //assert.True(t, ok) - - Params.QuotaConfig.NQInQueueThreshold = bak }) t.Run("test checkQueryLatency", func(t *testing.T) { @@ -227,8 +225,9 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, float64(1), factor) // test cool off - bak := Params.QuotaConfig.QueueLatencyThreshold + Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.QueueLatencyThreshold = float64(3 * time.Second) + quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ SearchQueue: metricsinfo.ReadInfoInQueue{ AvgQueueDuration: time.Duration(Params.QuotaConfig.QueueLatencyThreshold), @@ -247,8 +246,6 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, 1.0, factor) //ok := math.Abs(factor-1.0) < 0.0001 //assert.True(t, ok) - - Params.QuotaConfig.QueueLatencyThreshold = bak }) t.Run("test calculateReadRates", func(t *testing.T) { @@ -260,7 +257,8 @@ func TestQuotaCenter(t *testing.T) { }, }} - latencyBak := Params.QuotaConfig.QueueLatencyThreshold + Params.QuotaConfig.ForceDenyReading = false + Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.QueueLatencyThreshold = 100 quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ SearchQueue: metricsinfo.ReadInfoInQueue{ @@ -270,9 +268,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) - Params.QuotaConfig.QueueLatencyThreshold = latencyBak - queueBak := Params.QuotaConfig.NQInQueueThreshold Params.QuotaConfig.NQInQueueThreshold = 100 quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ SearchQueue: metricsinfo.ReadInfoInQueue{ @@ -282,7 +278,6 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) - Params.QuotaConfig.NQInQueueThreshold = queueBak }) t.Run("test calculateWriteRates", func(t *testing.T) { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a9e342d137..380c8c9de9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -619,7 +619,7 @@ func (c *Core) startInternal() error { go c.importManager.expireOldTasksLoop(&c.wg, c.broker.ReleaseSegRefLock) go c.importManager.sendOutTasksLoop(&c.wg) - if Params.QuotaConfig.EnableQuotaAndLimits { + if Params.QuotaConfig.QuotaAndLimitsEnabled { go c.quotaCenter.run() } diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index bb320de2c0..a001867a04 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -42,11 +42,11 @@ type quotaConfig struct { Base *BaseTable once sync.Once - EnableQuotaAndLimits bool - + QuotaAndLimitsEnabled bool QuotaCenterCollectInterval float64 // ddl + DDLLimitEnabled bool DDLCollectionRate float64 DDLPartitionRate float64 DDLIndexRate float64 @@ -54,6 +54,7 @@ type quotaConfig struct { DDLCompactionRate float64 // dml + DMLLimitEnabled bool DMLMaxInsertRate float64 DMLMinInsertRate float64 DMLMaxDeleteRate float64 @@ -62,6 +63,7 @@ type quotaConfig struct { DMLMinBulkLoadRate float64 // dql + DQLLimitEnabled bool DQLMaxSearchRate float64 DQLMinSearchRate float64 DQLMaxQueryRate float64 @@ -70,31 +72,40 @@ type quotaConfig struct { // limits MaxCollectionNum int + // limit writing ForceDenyWriting bool + TtProtectionEnabled bool MaxTimeTickDelay time.Duration + MemProtectionEnabled bool DataNodeMemoryLowWaterLevel float64 DataNodeMemoryHighWaterLevel float64 QueryNodeMemoryLowWaterLevel float64 QueryNodeMemoryHighWaterLevel float64 - ForceDenyReading bool - NQInQueueThreshold int64 - QueueLatencyThreshold float64 - CoolOffSpeed float64 + // limit reading + ForceDenyReading bool + QueueProtectionEnabled bool + NQInQueueThreshold int64 + QueueLatencyThreshold float64 + CoolOffSpeed float64 } func (p *quotaConfig) init(base *BaseTable) { p.Base = base - p.initEnableQuotaAndLimits() + p.initQuotaAndLimitsEnabled() p.initQuotaCenterCollectInterval() + // ddl + p.initDDLLimitEnabled() p.initDDLCollectionRate() p.initDDLPartitionRate() p.initDDLIndexRate() p.initDDLFlushRate() p.initDDLCompactionRate() + // dml + p.initDMLLimitEnabled() p.initDMLMaxInsertRate() p.initDMLMinInsertRate() p.initDMLMaxDeleteRate() @@ -102,28 +113,36 @@ func (p *quotaConfig) init(base *BaseTable) { p.initDMLMaxBulkLoadRate() p.initDMLMinBulkLoadRate() + // dql + p.initDQLLimitEnabled() p.initDQLMaxSearchRate() p.initDQLMinSearchRate() p.initDQLMaxQueryRate() p.initDQLMinQueryRate() + // limits p.initMaxCollectionNum() + // limit writing p.initForceDenyWriting() + p.initTtProtectionEnabled() p.initMaxTimeTickDelay() + p.initMemProtectionEnabled() p.initDataNodeMemoryLowWaterLevel() p.initDataNodeMemoryHighWaterLevel() p.initQueryNodeMemoryLowWaterLevel() p.initQueryNodeMemoryHighWaterLevel() + // limit reading p.initForceDenyReading() + p.initQueueProtectionEnabled() p.initNQInQueueThreshold() p.initQueueLatencyThreshold() p.initCoolOffSpeed() } -func (p *quotaConfig) initEnableQuotaAndLimits() { - p.EnableQuotaAndLimits = p.Base.ParseBool("quotaAndLimits.enable", false) +func (p *quotaConfig) initQuotaAndLimitsEnabled() { + p.QuotaAndLimitsEnabled = p.Base.ParseBool("quotaAndLimits.enabled", false) } func (p *quotaConfig) initQuotaCenterCollectInterval() { @@ -135,7 +154,15 @@ func (p *quotaConfig) initQuotaCenterCollectInterval() { } } +func (p *quotaConfig) initDDLLimitEnabled() { + p.DDLLimitEnabled = p.Base.ParseBool("quotaAndLimits.ddl.enabled", false) +} + func (p *quotaConfig) initDDLCollectionRate() { + if !p.DDLLimitEnabled { + p.DDLCollectionRate = defaultMax + return + } p.DDLCollectionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.collectionRate", defaultMax) // [0 ~ Inf) if p.DDLCollectionRate < 0 { @@ -144,6 +171,10 @@ func (p *quotaConfig) initDDLCollectionRate() { } func (p *quotaConfig) initDDLPartitionRate() { + if !p.DDLLimitEnabled { + p.DDLPartitionRate = defaultMax + return + } p.DDLPartitionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.partitionRate", defaultMax) // [0 ~ Inf) if p.DDLPartitionRate < 0 { @@ -152,6 +183,10 @@ func (p *quotaConfig) initDDLPartitionRate() { } func (p *quotaConfig) initDDLIndexRate() { + if !p.DDLLimitEnabled { + p.DDLIndexRate = defaultMax + return + } p.DDLIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.indexRate", defaultMax) // [0 ~ Inf) if p.DDLIndexRate < 0 { @@ -160,6 +195,10 @@ func (p *quotaConfig) initDDLIndexRate() { } func (p *quotaConfig) initDDLFlushRate() { + if !p.DDLLimitEnabled { + p.DDLFlushRate = defaultMax + return + } p.DDLFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.flushRate", defaultMax) // [0 ~ Inf) if p.DDLFlushRate < 0 { @@ -168,6 +207,10 @@ func (p *quotaConfig) initDDLFlushRate() { } func (p *quotaConfig) initDDLCompactionRate() { + if !p.DDLLimitEnabled { + p.DDLCompactionRate = defaultMax + return + } p.DDLCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.compactionRate", defaultMax) // [0 ~ Inf) if p.DDLCompactionRate < 0 { @@ -188,7 +231,15 @@ func (p *quotaConfig) checkMinMaxLegal(min, max float64) bool { return true } +func (p *quotaConfig) initDMLLimitEnabled() { + p.DMLLimitEnabled = p.Base.ParseBool("quotaAndLimits.dml.enabled", false) +} + func (p *quotaConfig) initDMLMaxInsertRate() { + if !p.DMLLimitEnabled { + p.DMLMaxInsertRate = defaultMax + return + } p.DMLMaxInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.max", defaultMax) if math.Abs(p.DMLMaxInsertRate-defaultMax) > 0.001 { // maxRate != defaultMax p.DMLMaxInsertRate = megaBytesRate2Bytes(p.DMLMaxInsertRate) @@ -200,6 +251,10 @@ func (p *quotaConfig) initDMLMaxInsertRate() { } func (p *quotaConfig) initDMLMinInsertRate() { + if !p.DMLLimitEnabled { + p.DMLMinInsertRate = defaultMin + return + } p.DMLMinInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.min", defaultMin) p.DMLMinInsertRate = megaBytesRate2Bytes(p.DMLMinInsertRate) // [0, inf) @@ -213,6 +268,10 @@ func (p *quotaConfig) initDMLMinInsertRate() { } func (p *quotaConfig) initDMLMaxDeleteRate() { + if !p.DMLLimitEnabled { + p.DMLMaxDeleteRate = defaultMax + return + } p.DMLMaxDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.max", defaultMax) if math.Abs(p.DMLMaxDeleteRate-defaultMax) > 0.001 { // maxRate != defaultMax p.DMLMaxDeleteRate = megaBytesRate2Bytes(p.DMLMaxDeleteRate) @@ -224,6 +283,10 @@ func (p *quotaConfig) initDMLMaxDeleteRate() { } func (p *quotaConfig) initDMLMinDeleteRate() { + if !p.DMLLimitEnabled { + p.DMLMinDeleteRate = defaultMin + return + } p.DMLMinDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.min", defaultMin) p.DMLMinDeleteRate = megaBytesRate2Bytes(p.DMLMinDeleteRate) // [0, inf) @@ -237,6 +300,10 @@ func (p *quotaConfig) initDMLMinDeleteRate() { } func (p *quotaConfig) initDMLMaxBulkLoadRate() { + if !p.DMLLimitEnabled { + p.DMLMaxBulkLoadRate = defaultMax + return + } p.DMLMaxBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.max", defaultMax) if math.Abs(p.DMLMaxBulkLoadRate-defaultMax) > 0.001 { // maxRate != defaultMax p.DMLMaxBulkLoadRate = megaBytesRate2Bytes(p.DMLMaxBulkLoadRate) @@ -248,6 +315,10 @@ func (p *quotaConfig) initDMLMaxBulkLoadRate() { } func (p *quotaConfig) initDMLMinBulkLoadRate() { + if !p.DMLLimitEnabled { + p.DMLMinBulkLoadRate = defaultMin + return + } p.DMLMinBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.min", defaultMin) p.DMLMinBulkLoadRate = megaBytesRate2Bytes(p.DMLMinBulkLoadRate) // [0, inf) @@ -260,7 +331,15 @@ func (p *quotaConfig) initDMLMinBulkLoadRate() { } } +func (p *quotaConfig) initDQLLimitEnabled() { + p.DQLLimitEnabled = p.Base.ParseBool("quotaAndLimits.dql.enabled", false) +} + func (p *quotaConfig) initDQLMaxSearchRate() { + if !p.DQLLimitEnabled { + p.DQLMaxSearchRate = defaultMax + return + } p.DQLMaxSearchRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.searchRate.max", defaultMax) // [0, inf) if p.DQLMaxSearchRate < 0 { @@ -269,6 +348,10 @@ func (p *quotaConfig) initDQLMaxSearchRate() { } func (p *quotaConfig) initDQLMinSearchRate() { + if !p.DQLLimitEnabled { + p.DQLMinSearchRate = defaultMin + return + } p.DQLMinSearchRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.searchRate.min", defaultMin) // [0, inf) if p.DQLMinSearchRate < 0 { @@ -281,6 +364,10 @@ func (p *quotaConfig) initDQLMinSearchRate() { } func (p *quotaConfig) initDQLMaxQueryRate() { + if !p.DQLLimitEnabled { + p.DQLMaxQueryRate = defaultMax + return + } p.DQLMaxQueryRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.queryRate.max", defaultMax) // [0, inf) if p.DQLMaxQueryRate < 0 { @@ -289,6 +376,10 @@ func (p *quotaConfig) initDQLMaxQueryRate() { } func (p *quotaConfig) initDQLMinQueryRate() { + if !p.DQLLimitEnabled { + p.DQLMinQueryRate = defaultMin + return + } p.DQLMinQueryRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.queryRate.min", defaultMin) // [0, inf) if p.DQLMinQueryRate < 0 { @@ -308,9 +399,16 @@ func (p *quotaConfig) initForceDenyWriting() { p.ForceDenyWriting = p.Base.ParseBool("quotaAndLimits.limitWriting.forceDeny", false) } +func (p *quotaConfig) initTtProtectionEnabled() { + p.TtProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitWriting.ttProtection", true) +} + func (p *quotaConfig) initMaxTimeTickDelay() { + if !p.TtProtectionEnabled { + return + } const defaultMaxTtDelay = 30.0 - delay := p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.maxTimeTickDelay", defaultMaxTtDelay) + delay := p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.ttProtection.maxTimeTickDelay", defaultMaxTtDelay) // (0, 65536) if delay <= 0 || delay >= 65536 { delay = defaultMaxTtDelay @@ -318,8 +416,15 @@ func (p *quotaConfig) initMaxTimeTickDelay() { p.MaxTimeTickDelay = time.Duration(delay * float64(time.Second)) } +func (p *quotaConfig) initMemProtectionEnabled() { + p.MemProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitWriting.memProtection.enabled", true) +} + func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() { - p.DataNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.dataNodeMemoryLowWaterLevel", defaultLowWaterLevel) + if !p.MemProtectionEnabled { + return + } + p.DataNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryLowWaterLevel", defaultLowWaterLevel) // (0, 1] if p.DataNodeMemoryLowWaterLevel <= 0 || p.DataNodeMemoryLowWaterLevel > 1 { log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.DataNodeMemoryLowWaterLevel), zap.Float64("default", defaultLowWaterLevel)) @@ -328,7 +433,10 @@ func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() { } func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() { - p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel) + if !p.MemProtectionEnabled { + return + } + p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel) // (0, 1] if p.DataNodeMemoryHighWaterLevel <= 0 || p.DataNodeMemoryHighWaterLevel > 1 { log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.DataNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel)) @@ -341,7 +449,10 @@ func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() { } func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() { - p.QueryNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.queryNodeMemoryLowWaterLevel", defaultLowWaterLevel) + if !p.MemProtectionEnabled { + return + } + p.QueryNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryLowWaterLevel", defaultLowWaterLevel) // (0, 1] if p.QueryNodeMemoryLowWaterLevel <= 0 || p.QueryNodeMemoryLowWaterLevel > 1 { log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryLowWaterLevel), zap.Float64("default", defaultLowWaterLevel)) @@ -350,7 +461,10 @@ func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() { } func (p *quotaConfig) initQueryNodeMemoryHighWaterLevel() { - p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel) + if !p.MemProtectionEnabled { + return + } + p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel) // (0, 1] if p.QueryNodeMemoryHighWaterLevel <= 0 || p.QueryNodeMemoryHighWaterLevel > 1 { log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel)) @@ -366,8 +480,15 @@ func (p *quotaConfig) initForceDenyReading() { p.ForceDenyReading = p.Base.ParseBool("quotaAndLimits.limitReading.forceDeny", false) } +func (p *quotaConfig) initQueueProtectionEnabled() { + p.QueueProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitReading.queueProtection.enabled", false) +} + func (p *quotaConfig) initNQInQueueThreshold() { - p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.NQInQueueThreshold", math.MaxInt64) + if !p.QueueProtectionEnabled { + return + } + p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold", math.MaxInt64) // [0, inf) if p.NQInQueueThreshold < 0 { p.NQInQueueThreshold = math.MaxInt64 @@ -375,7 +496,10 @@ func (p *quotaConfig) initNQInQueueThreshold() { } func (p *quotaConfig) initQueueLatencyThreshold() { - p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueLatencyThreshold", defaultMax) + if !p.QueueProtectionEnabled { + return + } + p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold", defaultMax) // [0, inf) if p.QueueLatencyThreshold < 0 { p.QueueLatencyThreshold = defaultMax @@ -384,7 +508,11 @@ func (p *quotaConfig) initQueueLatencyThreshold() { func (p *quotaConfig) initCoolOffSpeed() { const defaultSpeed = 0.9 - p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed) + p.CoolOffSpeed = defaultSpeed + if !p.QueueProtectionEnabled { + return + } + p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.coolOffSpeed", defaultSpeed) // (0, 1] if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 { log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed)) diff --git a/internal/util/paramtable/quota_param_test.go b/internal/util/paramtable/quota_param_test.go index dbb3d4ce35..3b10557eb6 100644 --- a/internal/util/paramtable/quota_param_test.go +++ b/internal/util/paramtable/quota_param_test.go @@ -17,7 +17,6 @@ package paramtable import ( - "math" "testing" "time" @@ -29,11 +28,12 @@ func TestQuotaParam(t *testing.T) { qc.init(&baseParams) t.Run("test quota", func(t *testing.T) { - assert.False(t, qc.EnableQuotaAndLimits) + assert.False(t, qc.QuotaAndLimitsEnabled) assert.Equal(t, float64(3), qc.QuotaCenterCollectInterval) }) t.Run("test ddl", func(t *testing.T) { + assert.Equal(t, false, qc.DDLLimitEnabled) assert.Equal(t, defaultMax, qc.DDLCollectionRate) assert.Equal(t, defaultMax, qc.DDLPartitionRate) assert.Equal(t, defaultMax, qc.DDLIndexRate) @@ -42,6 +42,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test dml", func(t *testing.T) { + assert.Equal(t, false, qc.DMLLimitEnabled) assert.Equal(t, defaultMax, qc.DMLMaxInsertRate) assert.Equal(t, defaultMin, qc.DMLMinInsertRate) assert.Equal(t, defaultMax, qc.DMLMaxDeleteRate) @@ -51,6 +52,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test dql", func(t *testing.T) { + assert.Equal(t, false, qc.DQLLimitEnabled) assert.Equal(t, defaultMax, qc.DQLMaxSearchRate) assert.Equal(t, defaultMin, qc.DQLMinSearchRate) assert.Equal(t, defaultMax, qc.DQLMaxQueryRate) @@ -61,8 +63,9 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, 64, qc.MaxCollectionNum) }) - t.Run("test force deny writing", func(t *testing.T) { + t.Run("test limit writing", func(t *testing.T) { assert.False(t, qc.ForceDenyWriting) + assert.Equal(t, true, qc.TtProtectionEnabled) assert.Equal(t, 30*time.Second, qc.MaxTimeTickDelay) assert.Equal(t, defaultLowWaterLevel, qc.DataNodeMemoryLowWaterLevel) assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel) @@ -70,10 +73,11 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, defaultHighWaterLevel, qc.QueryNodeMemoryHighWaterLevel) }) - t.Run("test force deny reading", func(t *testing.T) { + t.Run("test limit reading", func(t *testing.T) { assert.False(t, qc.ForceDenyReading) - assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold) - assert.Equal(t, float64(defaultMax), qc.QueueLatencyThreshold) + assert.Equal(t, false, qc.QueueProtectionEnabled) + assert.Equal(t, int64(0), qc.NQInQueueThreshold) + assert.Equal(t, float64(0), qc.QueueLatencyThreshold) assert.Equal(t, 0.9, qc.CoolOffSpeed) }) }