mirror of https://github.com/milvus-io/milvus.git
Enable rate limit and update quota unit (#19816)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/19831/head
parent
ae8ec28925
commit
760e9c5df2
|
@ -384,7 +384,7 @@ common:
|
|||
# 4. DQL result rate protection;
|
||||
# If necessary, you can also manually force to deny RW requests.
|
||||
quotaAndLimits:
|
||||
enabled: false # `true` to enable quota and limits, `false` to disable.
|
||||
enabled: true # `true` to enable quota and limits, `false` to disable.
|
||||
|
||||
# quotaCenterCollectInterval is the time interval that quotaCenter
|
||||
# collects metrics from Proxies, Query cluster and Data cluster.
|
||||
|
|
|
@ -250,6 +250,7 @@ func (q *QuotaCenter) syncMetrics() error {
|
|||
func (q *QuotaCenter) forceDenyWriting(reason ForceDenyTriggerReason) {
|
||||
q.currentRates[internalpb.RateType_DMLInsert] = 0
|
||||
q.currentRates[internalpb.RateType_DMLDelete] = 0
|
||||
q.currentRates[internalpb.RateType_DMLBulkLoad] = 0
|
||||
log.Warn("QuotaCenter force to deny writing", zap.String("reason", string(reason)))
|
||||
}
|
||||
|
||||
|
@ -393,6 +394,8 @@ func (q *QuotaCenter) resetCurrentRates() {
|
|||
q.currentRates[rt] = Limit(Params.QuotaConfig.DMLMaxInsertRate)
|
||||
case internalpb.RateType_DMLDelete:
|
||||
q.currentRates[rt] = Limit(Params.QuotaConfig.DMLMaxDeleteRate)
|
||||
case internalpb.RateType_DMLBulkLoad:
|
||||
q.currentRates[rt] = Limit(Params.QuotaConfig.DMLMaxBulkLoadRate)
|
||||
case internalpb.RateType_DQLSearch:
|
||||
q.currentRates[rt] = Limit(Params.QuotaConfig.DQLMaxSearchRate)
|
||||
case internalpb.RateType_DQLQuery:
|
||||
|
|
|
@ -29,10 +29,10 @@ import (
|
|||
const (
|
||||
// defaultMax is the default unlimited rate or threshold.
|
||||
defaultMax = float64(math.MaxFloat64)
|
||||
// GBSize used to convert gigabytes and bytes.
|
||||
GBSize = 1024.0 * 1024.0 * 1024.0
|
||||
// defaultDiskQuotaInGB is the default disk quota in gigabytes.
|
||||
defaultDiskQuotaInGB = defaultMax / GBSize
|
||||
// MBSize used to convert megabytes and bytes.
|
||||
MBSize = 1024.0 * 1024.0
|
||||
// defaultDiskQuotaInMB is the default disk quota in megabytes.
|
||||
defaultDiskQuotaInMB = defaultMax / MBSize
|
||||
// defaultMin is the default minimal rate.
|
||||
defaultMin = float64(0)
|
||||
// defaultLowWaterLevel is the default memory low water level.
|
||||
|
@ -250,8 +250,8 @@ func (p *quotaConfig) initMaxCompactionRate() {
|
|||
}
|
||||
}
|
||||
|
||||
func megaBytesRate2Bytes(f float64) float64 {
|
||||
return f * 1024 * 1024
|
||||
func megaBytes2Bytes(f float64) float64 {
|
||||
return f * MBSize
|
||||
}
|
||||
|
||||
func (p *quotaConfig) checkMinMaxLegal(min, max float64) bool {
|
||||
|
@ -274,7 +274,7 @@ func (p *quotaConfig) initDMLMaxInsertRate() {
|
|||
}
|
||||
p.DMLMaxInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.max", defaultMax)
|
||||
if math.Abs(p.DMLMaxInsertRate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
p.DMLMaxInsertRate = megaBytesRate2Bytes(p.DMLMaxInsertRate)
|
||||
p.DMLMaxInsertRate = megaBytes2Bytes(p.DMLMaxInsertRate)
|
||||
}
|
||||
// [0, inf)
|
||||
if p.DMLMaxInsertRate < 0 {
|
||||
|
@ -288,7 +288,7 @@ func (p *quotaConfig) initDMLMinInsertRate() {
|
|||
return
|
||||
}
|
||||
p.DMLMinInsertRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.insertRate.min", defaultMin)
|
||||
p.DMLMinInsertRate = megaBytesRate2Bytes(p.DMLMinInsertRate)
|
||||
p.DMLMinInsertRate = megaBytes2Bytes(p.DMLMinInsertRate)
|
||||
// [0, inf)
|
||||
if p.DMLMinInsertRate < 0 {
|
||||
p.DMLMinInsertRate = defaultMin
|
||||
|
@ -306,7 +306,7 @@ func (p *quotaConfig) initDMLMaxDeleteRate() {
|
|||
}
|
||||
p.DMLMaxDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.max", defaultMax)
|
||||
if math.Abs(p.DMLMaxDeleteRate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
p.DMLMaxDeleteRate = megaBytesRate2Bytes(p.DMLMaxDeleteRate)
|
||||
p.DMLMaxDeleteRate = megaBytes2Bytes(p.DMLMaxDeleteRate)
|
||||
}
|
||||
// [0, inf)
|
||||
if p.DMLMaxDeleteRate < 0 {
|
||||
|
@ -320,7 +320,7 @@ func (p *quotaConfig) initDMLMinDeleteRate() {
|
|||
return
|
||||
}
|
||||
p.DMLMinDeleteRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.deleteRate.min", defaultMin)
|
||||
p.DMLMinDeleteRate = megaBytesRate2Bytes(p.DMLMinDeleteRate)
|
||||
p.DMLMinDeleteRate = megaBytes2Bytes(p.DMLMinDeleteRate)
|
||||
// [0, inf)
|
||||
if p.DMLMinDeleteRate < 0 {
|
||||
p.DMLMinDeleteRate = defaultMin
|
||||
|
@ -338,7 +338,7 @@ func (p *quotaConfig) initDMLMaxBulkLoadRate() {
|
|||
}
|
||||
p.DMLMaxBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.max", defaultMax)
|
||||
if math.Abs(p.DMLMaxBulkLoadRate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
p.DMLMaxBulkLoadRate = megaBytesRate2Bytes(p.DMLMaxBulkLoadRate)
|
||||
p.DMLMaxBulkLoadRate = megaBytes2Bytes(p.DMLMaxBulkLoadRate)
|
||||
}
|
||||
// [0, inf)
|
||||
if p.DMLMaxBulkLoadRate < 0 {
|
||||
|
@ -352,7 +352,7 @@ func (p *quotaConfig) initDMLMinBulkLoadRate() {
|
|||
return
|
||||
}
|
||||
p.DMLMinBulkLoadRate = p.Base.ParseFloatWithDefault("quotaAndLimits.dml.bulkLoadRate.min", defaultMin)
|
||||
p.DMLMinBulkLoadRate = megaBytesRate2Bytes(p.DMLMinBulkLoadRate)
|
||||
p.DMLMinBulkLoadRate = megaBytes2Bytes(p.DMLMinBulkLoadRate)
|
||||
// [0, inf)
|
||||
if p.DMLMinBulkLoadRate < 0 {
|
||||
p.DMLMinBulkLoadRate = defaultMin
|
||||
|
@ -520,14 +520,15 @@ func (p *quotaConfig) initDiskQuota() {
|
|||
p.DiskQuota = defaultMax
|
||||
return
|
||||
}
|
||||
p.DiskQuota = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.diskProtection.diskQuota", defaultDiskQuotaInGB)
|
||||
p.DiskQuota = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.diskProtection.diskQuota", defaultDiskQuotaInMB)
|
||||
// (0, +inf)
|
||||
if p.DiskQuota <= 0 {
|
||||
log.Warn("DiskQuota must in the range of `(0, +inf)`, use default +inf", zap.Float64("DiskQuota", p.DiskQuota))
|
||||
p.DiskQuota = defaultDiskQuotaInGB
|
||||
p.DiskQuota = defaultDiskQuotaInMB
|
||||
}
|
||||
// gigabytes to bytes
|
||||
p.DiskQuota = p.DiskQuota * GBSize
|
||||
log.Debug("init disk quota", zap.Float64("diskQuota(MB)", p.DiskQuota))
|
||||
// megabytes to bytes
|
||||
p.DiskQuota = megaBytes2Bytes(p.DiskQuota)
|
||||
}
|
||||
|
||||
func (p *quotaConfig) initForceDenyReading() {
|
||||
|
@ -573,7 +574,7 @@ func (p *quotaConfig) initMaxReadResultRate() {
|
|||
}
|
||||
p.MaxReadResultRate = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.resultProtection.maxReadResultRate", defaultMax)
|
||||
if math.Abs(p.MaxReadResultRate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
p.MaxReadResultRate = megaBytesRate2Bytes(p.MaxReadResultRate)
|
||||
p.MaxReadResultRate = megaBytes2Bytes(p.MaxReadResultRate)
|
||||
}
|
||||
// [0, inf)
|
||||
if p.MaxReadResultRate < 0 {
|
||||
|
|
|
@ -29,7 +29,7 @@ func TestQuotaParam(t *testing.T) {
|
|||
qc.init(&baseParams)
|
||||
|
||||
t.Run("test quota", func(t *testing.T) {
|
||||
assert.False(t, qc.QuotaAndLimitsEnabled)
|
||||
assert.True(t, qc.QuotaAndLimitsEnabled)
|
||||
assert.Equal(t, float64(3), qc.QuotaCenterCollectInterval)
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue