Update quota params (#19351)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/19348/head
bigsheeper 2022-09-26 16:48:53 +08:00 committed by GitHub
parent c15b880f0e
commit 804c18df68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 230 additions and 83 deletions

View File

@ -378,61 +378,68 @@ common:
# 3. DQL Queue length/latency protection;
# If necessary, you can also manually force to deny RW requests.
quotaAndLimits:
enable: false # `true` to enable quota and limits, `false` to disable.
enabled: false # `true` to enable quota and limits, `false` to disable.
# quotaCenterCollectInterval is the time interval that quotaCenter
# collects metrics from Query cluster and Data cluster.
quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536)
ddl: # ddl limit rates, default no limit.
#collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection
#partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition
#indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex
#flushRate: # requests per minute, default no limit, rate for flush
#compactionRate: # requests per minute, default no limit, rate for manualCompaction
enabled: false
collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection
partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition
indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex
flushRate: # requests per minute, default no limit, rate for flush
compactionRate: # requests per minute, default no limit, rate for manualCompaction
# dml limit rates, default no limit.
# The maximum rate will not be greater than `max`,
# and the rate after handling back pressure will not be less than `min`.
dml:
enabled: false
insertRate:
#max: # MB/s, default no limit
#min: # MB/s, default 0
max: # MB/s, default no limit
min: # MB/s, default 0
deleteRate:
#max: # MB/s, default no limit
#min: # MB/s, default 0
max: # MB/s, default no limit
min: # MB/s, default 0
bulkLoadRate: # not support yet. TODO: limit bulkLoad rate
#max: # MB/s, default no limit
#min: # MB/s, default 0
max: # MB/s, default no limit
min: # MB/s, default 0
# dql limit rates, default no limit.
# The maximum rate will not be greater than `max`,
# and the rate after handling back pressure will not be less than `min`.
dql:
enabled: false
searchRate:
#max: # vps, default no limit
#min: # vps, default 0
max: # vps, default no limit
min: # vps, default 0
queryRate:
#max: # qps, default no limit
#min: # qps, default 0
max: # qps, default no limit
min: # qps, default 0
# limitWriting decides whether dml requests are allowed.
limitWriting:
# forceDeny `false` means dml requests are allowed (except for some
# specific conditions, such as memory of nodes to water marker), `true` means always reject all dml requests.
forceDeny: false
# maxTimeTickDelay indicates the backpressure for DML Operations.
# DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay,
# if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected.
maxTimeTickDelay: 30 # in seconds
# When memory usage > memoryHighWaterLevel, all dml requests would be rejected;
# When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate;
# When memory usage < memoryLowWaterLevel, no action.
# memoryLowWaterLevel should be less than memoryHighWaterLevel.
dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes
dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes
queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes
queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes
ttProtection:
enabled: true
# maxTimeTickDelay indicates the backpressure for DML Operations.
# DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay,
# if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected.
maxTimeTickDelay: 30 # in seconds
memProtection:
enabled: true
# When memory usage > memoryHighWaterLevel, all dml requests would be rejected;
# When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate;
# When memory usage < memoryLowWaterLevel, no action.
# memoryLowWaterLevel should be less than memoryHighWaterLevel.
dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes
dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes
queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes
queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes
# limitReading decides whether dql requests are allowed.
limitReading:
@ -440,16 +447,18 @@ quotaAndLimits:
# specific conditions, such as collection has been dropped), `true` means always reject all dql requests.
forceDeny: false
# NQInQueueThreshold indicated that the system was under backpressure for Search/Query path.
# If NQ in any QueryNode's queue is greater than NQInQueueThreshold, search&query rates would gradually cool off
# until the NQ in queue no longer exceeds NQInQueueThreshold. We think of the NQ of query request as 1.
#NQInQueueThreshold: # int, default no limit
queueProtection:
enabled: false
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
nqInQueueThreshold: # int, default no limit
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
# until the latency of queuing no longer exceeds queueLatencyThreshold.
# The latency here refers to the averaged latency over a period of time.
#queueLatencyThreshold: # milliseconds, default no limit
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
# until the latency of queuing no longer exceeds queueLatencyThreshold.
# The latency here refers to the averaged latency over a period of time.
queueLatencyThreshold: # milliseconds, default no limit
# coolOffSpeed is the speed of search&query rates cool off.
#coolOffSpeed: 0.9 # (0, 1]
# coolOffSpeed is the speed of search&query rates cool off.
coolOffSpeed: 0.9 # (0, 1]

View File

@ -45,7 +45,7 @@ func NewMultiRateLimiter() *MultiRateLimiter {
// Limit returns true, the request will be rejected.
// Otherwise, the request will pass. Limit also returns limit of limiter.
func (m *MultiRateLimiter) Limit(rt internalpb.RateType, n int) (bool, float64) {
if !Params.QuotaConfig.EnableQuotaAndLimits {
if !Params.QuotaConfig.QuotaAndLimitsEnabled {
return false, 1 // no limit
}
// TODO: call other rate limiters

View File

@ -28,8 +28,8 @@ import (
func TestMultiRateLimiter(t *testing.T) {
Params.Init()
t.Run("test multiRateLimiter", func(t *testing.T) {
bak := Params.QuotaConfig.EnableQuotaAndLimits
Params.QuotaConfig.EnableQuotaAndLimits = true
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
Params.QuotaConfig.QuotaAndLimitsEnabled = true
multiLimiter := NewMultiRateLimiter()
for _, rt := range internalpb.RateType_value {
multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
@ -42,17 +42,17 @@ func TestMultiRateLimiter(t *testing.T) {
ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt)
assert.True(t, ok)
}
Params.QuotaConfig.EnableQuotaAndLimits = bak
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
t.Run("not enable quotaAndLimit", func(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
bak := Params.QuotaConfig.EnableQuotaAndLimits
Params.QuotaConfig.EnableQuotaAndLimits = false
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
Params.QuotaConfig.QuotaAndLimitsEnabled = false
ok, r := multiLimiter.Limit(internalpb.RateType(0), 1)
assert.False(t, ok)
assert.NotEqual(t, float64(0), r)
Params.QuotaConfig.EnableQuotaAndLimits = bak
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
}

View File

@ -277,8 +277,11 @@ func (q *QuotaCenter) calculateReadRates() {
q.forceDenyReading(ManualForceDeny)
return
}
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed
if !Params.QuotaConfig.QueueProtectionEnabled {
return
}
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed
coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) {
if q.currentRates[internalpb.RateType_DQLSearch] != Inf {
q.currentRates[internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed)
@ -384,6 +387,10 @@ func (q *QuotaCenter) resetCurrentRates() {
// timeTickDelay gets time tick delay of DataNodes and QueryNodes,
// and return the factor according to max tolerable time tick delay.
func (q *QuotaCenter) timeTickDelay() (float64, error) {
if !Params.QuotaConfig.TtProtectionEnabled {
return 1, nil
}
maxTt := Params.QuotaConfig.MaxTimeTickDelay
if maxTt < 0 {
// < 0 means disable tt protection
@ -465,6 +472,10 @@ func (q *QuotaCenter) checkQueryLatency() float64 {
// and return the factor according to max memory water level.
func (q *QuotaCenter) memoryToWaterLevel() float64 {
factor := float64(1)
if !Params.QuotaConfig.MemProtectionEnabled {
return 1
}
dataNodeMemoryLowWaterLevel := Params.QuotaConfig.DataNodeMemoryLowWaterLevel
dataNodeMemoryHighWaterLevel := Params.QuotaConfig.DataNodeMemoryHighWaterLevel
queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel

View File

@ -144,8 +144,9 @@ func TestQuotaCenter(t *testing.T) {
now := time.Now()
bak := Params.QuotaConfig.MaxTimeTickDelay
Params.QuotaConfig.TtProtectionEnabled = true
Params.QuotaConfig.MaxTimeTickDelay = 3 * time.Second
// test force deny writing
alloc := newMockTsoAllocator()
alloc.GenerateTSOF = func(count uint32) (typeutil.Timestamp, error) {
@ -188,7 +189,6 @@ func TestQuotaCenter(t *testing.T) {
}
_, err = quotaCenter.timeTickDelay()
assert.Error(t, err)
Params.QuotaConfig.MaxTimeTickDelay = bak
})
t.Run("test checkNQInQuery", func(t *testing.T) {
@ -197,7 +197,7 @@ func TestQuotaCenter(t *testing.T) {
assert.Equal(t, float64(1), factor)
// test cool off
bak := Params.QuotaConfig.NQInQueueThreshold
Params.QuotaConfig.QueueProtectionEnabled = true
Params.QuotaConfig.NQInQueueThreshold = 100
quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{
SearchQueue: metricsinfo.ReadInfoInQueue{
@ -217,8 +217,6 @@ func TestQuotaCenter(t *testing.T) {
assert.Equal(t, 1.0, factor)
//ok := math.Abs(factor-1.0) < 0.0001
//assert.True(t, ok)
Params.QuotaConfig.NQInQueueThreshold = bak
})
t.Run("test checkQueryLatency", func(t *testing.T) {
@ -227,8 +225,9 @@ func TestQuotaCenter(t *testing.T) {
assert.Equal(t, float64(1), factor)
// test cool off
bak := Params.QuotaConfig.QueueLatencyThreshold
Params.QuotaConfig.QueueProtectionEnabled = true
Params.QuotaConfig.QueueLatencyThreshold = float64(3 * time.Second)
quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{
SearchQueue: metricsinfo.ReadInfoInQueue{
AvgQueueDuration: time.Duration(Params.QuotaConfig.QueueLatencyThreshold),
@ -247,8 +246,6 @@ func TestQuotaCenter(t *testing.T) {
assert.Equal(t, 1.0, factor)
//ok := math.Abs(factor-1.0) < 0.0001
//assert.True(t, ok)
Params.QuotaConfig.QueueLatencyThreshold = bak
})
t.Run("test calculateReadRates", func(t *testing.T) {
@ -260,7 +257,8 @@ func TestQuotaCenter(t *testing.T) {
},
}}
latencyBak := Params.QuotaConfig.QueueLatencyThreshold
Params.QuotaConfig.ForceDenyReading = false
Params.QuotaConfig.QueueProtectionEnabled = true
Params.QuotaConfig.QueueLatencyThreshold = 100
quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{
SearchQueue: metricsinfo.ReadInfoInQueue{
@ -270,9 +268,7 @@ func TestQuotaCenter(t *testing.T) {
quotaCenter.calculateReadRates()
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch])
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
Params.QuotaConfig.QueueLatencyThreshold = latencyBak
queueBak := Params.QuotaConfig.NQInQueueThreshold
Params.QuotaConfig.NQInQueueThreshold = 100
quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{
SearchQueue: metricsinfo.ReadInfoInQueue{
@ -282,7 +278,6 @@ func TestQuotaCenter(t *testing.T) {
quotaCenter.calculateReadRates()
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch])
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
Params.QuotaConfig.NQInQueueThreshold = queueBak
})
t.Run("test calculateWriteRates", func(t *testing.T) {

View File

@ -619,7 +619,7 @@ func (c *Core) startInternal() error {
go c.importManager.expireOldTasksLoop(&c.wg, c.broker.ReleaseSegRefLock)
go c.importManager.sendOutTasksLoop(&c.wg)
if Params.QuotaConfig.EnableQuotaAndLimits {
if Params.QuotaConfig.QuotaAndLimitsEnabled {
go c.quotaCenter.run()
}

View File

@ -42,11 +42,11 @@ type quotaConfig struct {
Base *BaseTable
once sync.Once
EnableQuotaAndLimits bool
QuotaAndLimitsEnabled bool
QuotaCenterCollectInterval float64
// ddl
DDLLimitEnabled bool
DDLCollectionRate float64
DDLPartitionRate float64
DDLIndexRate float64
@ -54,6 +54,7 @@ type quotaConfig struct {
DDLCompactionRate float64
// dml
DMLLimitEnabled bool
DMLMaxInsertRate float64
DMLMinInsertRate float64
DMLMaxDeleteRate float64
@ -62,6 +63,7 @@ type quotaConfig struct {
DMLMinBulkLoadRate float64
// dql
DQLLimitEnabled bool
DQLMaxSearchRate float64
DQLMinSearchRate float64
DQLMaxQueryRate float64
@ -70,31 +72,40 @@ type quotaConfig struct {
// limits
MaxCollectionNum int
// limit writing
ForceDenyWriting bool
TtProtectionEnabled bool
MaxTimeTickDelay time.Duration
MemProtectionEnabled bool
DataNodeMemoryLowWaterLevel float64
DataNodeMemoryHighWaterLevel float64
QueryNodeMemoryLowWaterLevel float64
QueryNodeMemoryHighWaterLevel float64
ForceDenyReading bool
NQInQueueThreshold int64
QueueLatencyThreshold float64
CoolOffSpeed float64
// limit reading
ForceDenyReading bool
QueueProtectionEnabled bool
NQInQueueThreshold int64
QueueLatencyThreshold float64
CoolOffSpeed float64
}
func (p *quotaConfig) init(base *BaseTable) {
p.Base = base
p.initEnableQuotaAndLimits()
p.initQuotaAndLimitsEnabled()
p.initQuotaCenterCollectInterval()
// ddl
p.initDDLLimitEnabled()
p.initDDLCollectionRate()
p.initDDLPartitionRate()
p.initDDLIndexRate()
p.initDDLFlushRate()
p.initDDLCompactionRate()
// dml
p.initDMLLimitEnabled()
p.initDMLMaxInsertRate()
p.initDMLMinInsertRate()
p.initDMLMaxDeleteRate()
@ -102,28 +113,36 @@ func (p *quotaConfig) init(base *BaseTable) {
p.initDMLMaxBulkLoadRate()
p.initDMLMinBulkLoadRate()
// dql
p.initDQLLimitEnabled()
p.initDQLMaxSearchRate()
p.initDQLMinSearchRate()
p.initDQLMaxQueryRate()
p.initDQLMinQueryRate()
// limits
p.initMaxCollectionNum()
// limit writing
p.initForceDenyWriting()
p.initTtProtectionEnabled()
p.initMaxTimeTickDelay()
p.initMemProtectionEnabled()
p.initDataNodeMemoryLowWaterLevel()
p.initDataNodeMemoryHighWaterLevel()
p.initQueryNodeMemoryLowWaterLevel()
p.initQueryNodeMemoryHighWaterLevel()
// limit reading
p.initForceDenyReading()
p.initQueueProtectionEnabled()
p.initNQInQueueThreshold()
p.initQueueLatencyThreshold()
p.initCoolOffSpeed()
}
func (p *quotaConfig) initEnableQuotaAndLimits() {
p.EnableQuotaAndLimits = p.Base.ParseBool("quotaAndLimits.enable", false)
func (p *quotaConfig) initQuotaAndLimitsEnabled() {
p.QuotaAndLimitsEnabled = p.Base.ParseBool("quotaAndLimits.enabled", false)
}
func (p *quotaConfig) initQuotaCenterCollectInterval() {
@ -135,7 +154,15 @@ func (p *quotaConfig) initQuotaCenterCollectInterval() {
}
}
func (p *quotaConfig) initDDLLimitEnabled() {
p.DDLLimitEnabled = p.Base.ParseBool("quotaAndLimits.ddl.enabled", false)
}
func (p *quotaConfig) initDDLCollectionRate() {
if !p.DDLLimitEnabled {
p.DDLCollectionRate = defaultMax
return
}
p.DDLCollectionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.collectionRate", defaultMax)
// [0 ~ Inf)
if p.DDLCollectionRate < 0 {
@ -144,6 +171,10 @@ func (p *quotaConfig) initDDLCollectionRate() {
}
func (p *quotaConfig) initDDLPartitionRate() {
if !p.DDLLimitEnabled {
p.DDLPartitionRate = defaultMax
return
}
p.DDLPartitionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.partitionRate", defaultMax)
// [0 ~ Inf)
if p.DDLPartitionRate < 0 {
@ -152,6 +183,10 @@ func (p *quotaConfig) initDDLPartitionRate() {
}
func (p *quotaConfig) initDDLIndexRate() {
if !p.DDLLimitEnabled {
p.DDLIndexRate = defaultMax
return
}
p.DDLIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.indexRate", defaultMax)
// [0 ~ Inf)
if p.DDLIndexRate < 0 {
@ -160,6 +195,10 @@ func (p *quotaConfig) initDDLIndexRate() {
}
func (p *quotaConfig) initDDLFlushRate() {
if !p.DDLLimitEnabled {
p.DDLFlushRate = defaultMax
return
}
p.DDLFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.flushRate", defaultMax)
// [0 ~ Inf)
if p.DDLFlushRate < 0 {
@ -168,6 +207,10 @@ func (p *quotaConfig) initDDLFlushRate() {
}
func (p *quotaConfig) initDDLCompactionRate() {
if !p.DDLLimitEnabled {
p.DDLCompactionRate = defaultMax
return
}
p.DDLCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.compactionRate", defaultMax)
// [0 ~ Inf)
if p.DDLCompactionRate < 0 {
@ -188,7 +231,15 @@ func (p *quotaConfig) checkMinMaxLegal(min, max float64) bool {
return true
}
func (p *quotaConfig) initDMLLimitEnabled() {
p.DMLLimitEnabled = p.Base.ParseBool("quotaAndLimits.dml.enabled", false)
}
func (p *quotaConfig) initDMLMaxInsertRate() {
if !p.DMLLimitEnabled {
p.DMLMaxInsertRate = defaultMax
return
}
p.DMLMaxInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.max", defaultMax)
if math.Abs(p.DMLMaxInsertRate-defaultMax) > 0.001 { // maxRate != defaultMax
p.DMLMaxInsertRate = megaBytesRate2Bytes(p.DMLMaxInsertRate)
@ -200,6 +251,10 @@ func (p *quotaConfig) initDMLMaxInsertRate() {
}
func (p *quotaConfig) initDMLMinInsertRate() {
if !p.DMLLimitEnabled {
p.DMLMinInsertRate = defaultMin
return
}
p.DMLMinInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.min", defaultMin)
p.DMLMinInsertRate = megaBytesRate2Bytes(p.DMLMinInsertRate)
// [0, inf)
@ -213,6 +268,10 @@ func (p *quotaConfig) initDMLMinInsertRate() {
}
func (p *quotaConfig) initDMLMaxDeleteRate() {
if !p.DMLLimitEnabled {
p.DMLMaxDeleteRate = defaultMax
return
}
p.DMLMaxDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.max", defaultMax)
if math.Abs(p.DMLMaxDeleteRate-defaultMax) > 0.001 { // maxRate != defaultMax
p.DMLMaxDeleteRate = megaBytesRate2Bytes(p.DMLMaxDeleteRate)
@ -224,6 +283,10 @@ func (p *quotaConfig) initDMLMaxDeleteRate() {
}
func (p *quotaConfig) initDMLMinDeleteRate() {
if !p.DMLLimitEnabled {
p.DMLMinDeleteRate = defaultMin
return
}
p.DMLMinDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.min", defaultMin)
p.DMLMinDeleteRate = megaBytesRate2Bytes(p.DMLMinDeleteRate)
// [0, inf)
@ -237,6 +300,10 @@ func (p *quotaConfig) initDMLMinDeleteRate() {
}
func (p *quotaConfig) initDMLMaxBulkLoadRate() {
if !p.DMLLimitEnabled {
p.DMLMaxBulkLoadRate = defaultMax
return
}
p.DMLMaxBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.max", defaultMax)
if math.Abs(p.DMLMaxBulkLoadRate-defaultMax) > 0.001 { // maxRate != defaultMax
p.DMLMaxBulkLoadRate = megaBytesRate2Bytes(p.DMLMaxBulkLoadRate)
@ -248,6 +315,10 @@ func (p *quotaConfig) initDMLMaxBulkLoadRate() {
}
func (p *quotaConfig) initDMLMinBulkLoadRate() {
if !p.DMLLimitEnabled {
p.DMLMinBulkLoadRate = defaultMin
return
}
p.DMLMinBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.min", defaultMin)
p.DMLMinBulkLoadRate = megaBytesRate2Bytes(p.DMLMinBulkLoadRate)
// [0, inf)
@ -260,7 +331,15 @@ func (p *quotaConfig) initDMLMinBulkLoadRate() {
}
}
func (p *quotaConfig) initDQLLimitEnabled() {
p.DQLLimitEnabled = p.Base.ParseBool("quotaAndLimits.dql.enabled", false)
}
func (p *quotaConfig) initDQLMaxSearchRate() {
if !p.DQLLimitEnabled {
p.DQLMaxSearchRate = defaultMax
return
}
p.DQLMaxSearchRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.searchRate.max", defaultMax)
// [0, inf)
if p.DQLMaxSearchRate < 0 {
@ -269,6 +348,10 @@ func (p *quotaConfig) initDQLMaxSearchRate() {
}
func (p *quotaConfig) initDQLMinSearchRate() {
if !p.DQLLimitEnabled {
p.DQLMinSearchRate = defaultMin
return
}
p.DQLMinSearchRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.searchRate.min", defaultMin)
// [0, inf)
if p.DQLMinSearchRate < 0 {
@ -281,6 +364,10 @@ func (p *quotaConfig) initDQLMinSearchRate() {
}
func (p *quotaConfig) initDQLMaxQueryRate() {
if !p.DQLLimitEnabled {
p.DQLMaxQueryRate = defaultMax
return
}
p.DQLMaxQueryRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.queryRate.max", defaultMax)
// [0, inf)
if p.DQLMaxQueryRate < 0 {
@ -289,6 +376,10 @@ func (p *quotaConfig) initDQLMaxQueryRate() {
}
func (p *quotaConfig) initDQLMinQueryRate() {
if !p.DQLLimitEnabled {
p.DQLMinQueryRate = defaultMin
return
}
p.DQLMinQueryRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dql.queryRate.min", defaultMin)
// [0, inf)
if p.DQLMinQueryRate < 0 {
@ -308,9 +399,16 @@ func (p *quotaConfig) initForceDenyWriting() {
p.ForceDenyWriting = p.Base.ParseBool("quotaAndLimits.limitWriting.forceDeny", false)
}
func (p *quotaConfig) initTtProtectionEnabled() {
p.TtProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitWriting.ttProtection", true)
}
func (p *quotaConfig) initMaxTimeTickDelay() {
if !p.TtProtectionEnabled {
return
}
const defaultMaxTtDelay = 30.0
delay := p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.maxTimeTickDelay", defaultMaxTtDelay)
delay := p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.ttProtection.maxTimeTickDelay", defaultMaxTtDelay)
// (0, 65536)
if delay <= 0 || delay >= 65536 {
delay = defaultMaxTtDelay
@ -318,8 +416,15 @@ func (p *quotaConfig) initMaxTimeTickDelay() {
p.MaxTimeTickDelay = time.Duration(delay * float64(time.Second))
}
func (p *quotaConfig) initMemProtectionEnabled() {
p.MemProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitWriting.memProtection.enabled", true)
}
func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() {
p.DataNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.dataNodeMemoryLowWaterLevel", defaultLowWaterLevel)
if !p.MemProtectionEnabled {
return
}
p.DataNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryLowWaterLevel", defaultLowWaterLevel)
// (0, 1]
if p.DataNodeMemoryLowWaterLevel <= 0 || p.DataNodeMemoryLowWaterLevel > 1 {
log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.DataNodeMemoryLowWaterLevel), zap.Float64("default", defaultLowWaterLevel))
@ -328,7 +433,10 @@ func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() {
}
func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() {
p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel)
if !p.MemProtectionEnabled {
return
}
p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel)
// (0, 1]
if p.DataNodeMemoryHighWaterLevel <= 0 || p.DataNodeMemoryHighWaterLevel > 1 {
log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.DataNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel))
@ -341,7 +449,10 @@ func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() {
}
func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() {
p.QueryNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.queryNodeMemoryLowWaterLevel", defaultLowWaterLevel)
if !p.MemProtectionEnabled {
return
}
p.QueryNodeMemoryLowWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryLowWaterLevel", defaultLowWaterLevel)
// (0, 1]
if p.QueryNodeMemoryLowWaterLevel <= 0 || p.QueryNodeMemoryLowWaterLevel > 1 {
log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryLowWaterLevel), zap.Float64("default", defaultLowWaterLevel))
@ -350,7 +461,10 @@ func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() {
}
func (p *quotaConfig) initQueryNodeMemoryHighWaterLevel() {
p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel)
if !p.MemProtectionEnabled {
return
}
p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel)
// (0, 1]
if p.QueryNodeMemoryHighWaterLevel <= 0 || p.QueryNodeMemoryHighWaterLevel > 1 {
log.Warn("MemoryLowWaterLevel must in the range of `(0, 1]`, use default value", zap.Float64("low", p.QueryNodeMemoryHighWaterLevel), zap.Float64("default", defaultHighWaterLevel))
@ -366,8 +480,15 @@ func (p *quotaConfig) initForceDenyReading() {
p.ForceDenyReading = p.Base.ParseBool("quotaAndLimits.limitReading.forceDeny", false)
}
func (p *quotaConfig) initQueueProtectionEnabled() {
p.QueueProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitReading.queueProtection.enabled", false)
}
func (p *quotaConfig) initNQInQueueThreshold() {
p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.NQInQueueThreshold", math.MaxInt64)
if !p.QueueProtectionEnabled {
return
}
p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold", math.MaxInt64)
// [0, inf)
if p.NQInQueueThreshold < 0 {
p.NQInQueueThreshold = math.MaxInt64
@ -375,7 +496,10 @@ func (p *quotaConfig) initNQInQueueThreshold() {
}
func (p *quotaConfig) initQueueLatencyThreshold() {
p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueLatencyThreshold", defaultMax)
if !p.QueueProtectionEnabled {
return
}
p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold", defaultMax)
// [0, inf)
if p.QueueLatencyThreshold < 0 {
p.QueueLatencyThreshold = defaultMax
@ -384,7 +508,11 @@ func (p *quotaConfig) initQueueLatencyThreshold() {
func (p *quotaConfig) initCoolOffSpeed() {
const defaultSpeed = 0.9
p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed)
p.CoolOffSpeed = defaultSpeed
if !p.QueueProtectionEnabled {
return
}
p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.coolOffSpeed", defaultSpeed)
// (0, 1]
if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 {
log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed))

View File

@ -17,7 +17,6 @@
package paramtable
import (
"math"
"testing"
"time"
@ -29,11 +28,12 @@ func TestQuotaParam(t *testing.T) {
qc.init(&baseParams)
t.Run("test quota", func(t *testing.T) {
assert.False(t, qc.EnableQuotaAndLimits)
assert.False(t, qc.QuotaAndLimitsEnabled)
assert.Equal(t, float64(3), qc.QuotaCenterCollectInterval)
})
t.Run("test ddl", func(t *testing.T) {
assert.Equal(t, false, qc.DDLLimitEnabled)
assert.Equal(t, defaultMax, qc.DDLCollectionRate)
assert.Equal(t, defaultMax, qc.DDLPartitionRate)
assert.Equal(t, defaultMax, qc.DDLIndexRate)
@ -42,6 +42,7 @@ func TestQuotaParam(t *testing.T) {
})
t.Run("test dml", func(t *testing.T) {
assert.Equal(t, false, qc.DMLLimitEnabled)
assert.Equal(t, defaultMax, qc.DMLMaxInsertRate)
assert.Equal(t, defaultMin, qc.DMLMinInsertRate)
assert.Equal(t, defaultMax, qc.DMLMaxDeleteRate)
@ -51,6 +52,7 @@ func TestQuotaParam(t *testing.T) {
})
t.Run("test dql", func(t *testing.T) {
assert.Equal(t, false, qc.DQLLimitEnabled)
assert.Equal(t, defaultMax, qc.DQLMaxSearchRate)
assert.Equal(t, defaultMin, qc.DQLMinSearchRate)
assert.Equal(t, defaultMax, qc.DQLMaxQueryRate)
@ -61,8 +63,9 @@ func TestQuotaParam(t *testing.T) {
assert.Equal(t, 64, qc.MaxCollectionNum)
})
t.Run("test force deny writing", func(t *testing.T) {
t.Run("test limit writing", func(t *testing.T) {
assert.False(t, qc.ForceDenyWriting)
assert.Equal(t, true, qc.TtProtectionEnabled)
assert.Equal(t, 30*time.Second, qc.MaxTimeTickDelay)
assert.Equal(t, defaultLowWaterLevel, qc.DataNodeMemoryLowWaterLevel)
assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel)
@ -70,10 +73,11 @@ func TestQuotaParam(t *testing.T) {
assert.Equal(t, defaultHighWaterLevel, qc.QueryNodeMemoryHighWaterLevel)
})
t.Run("test force deny reading", func(t *testing.T) {
t.Run("test limit reading", func(t *testing.T) {
assert.False(t, qc.ForceDenyReading)
assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold)
assert.Equal(t, float64(defaultMax), qc.QueueLatencyThreshold)
assert.Equal(t, false, qc.QueueProtectionEnabled)
assert.Equal(t, int64(0), qc.NQInQueueThreshold)
assert.Equal(t, float64(0), qc.QueueLatencyThreshold)
assert.Equal(t, 0.9, qc.CoolOffSpeed)
})
}