mirror of https://github.com/milvus-io/milvus.git
Add read result rate protection (#19728)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/19756/head
parent
ab88dd77e7
commit
f1e330a997
|
@ -373,6 +373,7 @@ common:
|
||||||
# 1. DML throughput limitation;
|
# 1. DML throughput limitation;
|
||||||
# 2. DDL, DQL qps/rps limitation;
|
# 2. DDL, DQL qps/rps limitation;
|
||||||
# 3. DQL Queue length/latency protection;
|
# 3. DQL Queue length/latency protection;
|
||||||
|
# 4. DQL result rate protection;
|
||||||
# If necessary, you can also manually force to deny RW requests.
|
# If necessary, you can also manually force to deny RW requests.
|
||||||
quotaAndLimits:
|
quotaAndLimits:
|
||||||
enabled: false # `true` to enable quota and limits, `false` to disable.
|
enabled: false # `true` to enable quota and limits, `false` to disable.
|
||||||
|
@ -383,38 +384,38 @@ quotaAndLimits:
|
||||||
|
|
||||||
ddl: # ddl limit rates, default no limit.
|
ddl: # ddl limit rates, default no limit.
|
||||||
enabled: false
|
enabled: false
|
||||||
collectionRate: # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
|
collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
|
||||||
partitionRate: # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition
|
partitionRate: -1 # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition
|
||||||
|
|
||||||
indexRate:
|
indexRate:
|
||||||
enabled: false
|
enabled: false
|
||||||
max: # qps, default no limit, rate for CreateIndex, DropIndex
|
max: -1 # qps, default no limit, rate for CreateIndex, DropIndex
|
||||||
flushRate:
|
flushRate:
|
||||||
enabled: false
|
enabled: false
|
||||||
max: # qps, default no limit, rate for flush
|
max: -1 # qps, default no limit, rate for flush
|
||||||
compactionRate:
|
compactionRate:
|
||||||
enabled: false
|
enabled: false
|
||||||
max: # qps, default no limit, rate for manualCompaction
|
max: -1 # qps, default no limit, rate for manualCompaction
|
||||||
|
|
||||||
# dml limit rates, default no limit.
|
# dml limit rates, default no limit.
|
||||||
# The maximum rate will not be greater than `max`.
|
# The maximum rate will not be greater than `max`.
|
||||||
dml:
|
dml:
|
||||||
enabled: false
|
enabled: false
|
||||||
insertRate:
|
insertRate:
|
||||||
max: # MB/s, default no limit
|
max: -1 # MB/s, default no limit
|
||||||
deleteRate:
|
deleteRate:
|
||||||
max: # MB/s, default no limit
|
max: -1 # MB/s, default no limit
|
||||||
bulkLoadRate: # not support yet. TODO: limit bulkLoad rate
|
bulkLoadRate: # not support yet. TODO: limit bulkLoad rate
|
||||||
max: # MB/s, default no limit
|
max: -1 # MB/s, default no limit
|
||||||
|
|
||||||
# dql limit rates, default no limit.
|
# dql limit rates, default no limit.
|
||||||
# The maximum rate will not be greater than `max`.
|
# The maximum rate will not be greater than `max`.
|
||||||
dql:
|
dql:
|
||||||
enabled: false
|
enabled: false
|
||||||
searchRate:
|
searchRate:
|
||||||
max: # vps (vectors per second), default no limit
|
max: -1 # vps (vectors per second), default no limit
|
||||||
queryRate:
|
queryRate:
|
||||||
max: # qps, default no limit
|
max: -1 # qps, default no limit
|
||||||
|
|
||||||
# limitWriting decides whether dml requests are allowed.
|
# limitWriting decides whether dml requests are allowed.
|
||||||
limitWriting:
|
limitWriting:
|
||||||
|
@ -440,29 +441,33 @@ quotaAndLimits:
|
||||||
diskProtection:
|
diskProtection:
|
||||||
# When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
|
# When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
|
||||||
enabled: true
|
enabled: true
|
||||||
diskQuota: -1 # GB, (0, +inf), -1 means use default +inf
|
diskQuota: -1 # GB, (0, +inf), default no limit
|
||||||
|
|
||||||
# limitReading decides whether dql requests are allowed.
|
# limitReading decides whether dql requests are allowed.
|
||||||
limitReading:
|
limitReading:
|
||||||
# forceDeny `false` means dql requests are allowed (except for some
|
# forceDeny `false` means dql requests are allowed (except for some
|
||||||
# specific conditions, such as collection has been dropped), `true` means always reject all dql requests.
|
# specific conditions, such as collection has been dropped), `true` means always reject all dql requests.
|
||||||
forceDeny: false
|
forceDeny: false
|
||||||
|
|
||||||
queueProtection:
|
queueProtection:
|
||||||
enabled: false
|
enabled: false
|
||||||
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
|
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
|
||||||
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
|
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
|
||||||
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
|
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
|
||||||
nqInQueueThreshold: # int, default no limit
|
nqInQueueThreshold: -1 # int, default no limit
|
||||||
|
|
||||||
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
|
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
|
||||||
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
|
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
|
||||||
# until the latency of queuing no longer exceeds queueLatencyThreshold.
|
# until the latency of queuing no longer exceeds queueLatencyThreshold.
|
||||||
# The latency here refers to the averaged latency over a period of time.
|
# The latency here refers to the averaged latency over a period of time.
|
||||||
queueLatencyThreshold: # milliseconds, default no limit
|
queueLatencyThreshold: -1 # milliseconds, default no limit
|
||||||
|
resultProtection:
|
||||||
# coolOffSpeed is the speed of search&query rates cool off.
|
enabled: false
|
||||||
coolOffSpeed: 0.9 # (0, 1]
|
# maxReadResultRate indicated that the system was under backpressure for Search/Query path.
|
||||||
|
# If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
|
||||||
|
# until the read result rate no longer exceeds maxReadResultRate.
|
||||||
|
maxReadResultRate: -1 # MB/s, default no limit
|
||||||
|
# coolOffSpeed is the speed of search&query rates cool off.
|
||||||
|
coolOffSpeed: 0.9 # (0, 1]
|
||||||
|
|
||||||
# AutoIndexConfig
|
# AutoIndexConfig
|
||||||
autoIndex:
|
autoIndex:
|
||||||
|
|
|
@ -2850,6 +2850,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
||||||
if qt.result != nil {
|
if qt.result != nil {
|
||||||
sentSize := proto.Size(qt.result)
|
sentSize := proto.Size(qt.result)
|
||||||
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
|
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
|
||||||
|
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
|
||||||
}
|
}
|
||||||
return qt.result, nil
|
return qt.result, nil
|
||||||
}
|
}
|
||||||
|
@ -3068,6 +3069,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
||||||
FieldsData: qt.result.FieldsData,
|
FieldsData: qt.result.FieldsData,
|
||||||
}
|
}
|
||||||
sentSize := proto.Size(qt.result)
|
sentSize := proto.Size(qt.result)
|
||||||
|
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
|
||||||
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
|
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,21 +35,22 @@ type showConfigurationsFuncType func(ctx context.Context, request *internalpb.Sh
|
||||||
func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) {
|
func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) {
|
||||||
var err error
|
var err error
|
||||||
rms := make([]metricsinfo.RateMetric, 0)
|
rms := make([]metricsinfo.RateMetric, 0)
|
||||||
getRateMetric := func(rateType internalpb.RateType) {
|
getRateMetric := func(label string) {
|
||||||
rate, err2 := rateCol.Rate(rateType.String(), ratelimitutil.DefaultAvgDuration)
|
rate, err2 := rateCol.Rate(label, ratelimitutil.DefaultAvgDuration)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
err = err2
|
err = err2
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rms = append(rms, metricsinfo.RateMetric{
|
rms = append(rms, metricsinfo.RateMetric{
|
||||||
Label: rateType.String(),
|
Label: label,
|
||||||
Rate: rate,
|
Rate: rate,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
getRateMetric(internalpb.RateType_DMLInsert)
|
getRateMetric(internalpb.RateType_DMLInsert.String())
|
||||||
getRateMetric(internalpb.RateType_DMLDelete)
|
getRateMetric(internalpb.RateType_DMLDelete.String())
|
||||||
getRateMetric(internalpb.RateType_DQLSearch)
|
getRateMetric(internalpb.RateType_DQLSearch.String())
|
||||||
getRateMetric(internalpb.RateType_DQLQuery)
|
getRateMetric(internalpb.RateType_DQLQuery.String())
|
||||||
|
getRateMetric(metricsinfo.ReadResultThroughput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,6 +169,7 @@ func (node *Proxy) initRateCollector() error {
|
||||||
// TODO: add bulkLoad rate
|
// TODO: add bulkLoad rate
|
||||||
rateCol.Register(internalpb.RateType_DQLSearch.String())
|
rateCol.Register(internalpb.RateType_DQLSearch.String())
|
||||||
rateCol.Register(internalpb.RateType_DQLQuery.String())
|
rateCol.Register(internalpb.RateType_DQLQuery.String())
|
||||||
|
rateCol.Register(metricsinfo.ReadResultThroughput)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ type Limit = ratelimitutil.Limit
|
||||||
// 3. Disk quota protection -> force deny writing if exceeded
|
// 3. Disk quota protection -> force deny writing if exceeded
|
||||||
// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed
|
// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed
|
||||||
// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed
|
// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed
|
||||||
|
// 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed
|
||||||
// If necessary, user can also manually force to deny RW requests.
|
// If necessary, user can also manually force to deny RW requests.
|
||||||
type QuotaCenter struct {
|
type QuotaCenter struct {
|
||||||
// clients
|
// clients
|
||||||
|
@ -285,9 +286,6 @@ func (q *QuotaCenter) calculateReadRates() {
|
||||||
q.forceDenyReading(ManualForceDeny)
|
q.forceDenyReading(ManualForceDeny)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !Params.QuotaConfig.QueueProtectionEnabled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed
|
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed
|
||||||
coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) {
|
coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) {
|
||||||
|
@ -316,6 +314,13 @@ func (q *QuotaCenter) calculateReadRates() {
|
||||||
log.Debug("QuotaCenter checkNQInQuery done", zap.Float64("queueLengthFactor", queueLengthFactor))
|
log.Debug("QuotaCenter checkNQInQuery done", zap.Float64("queueLengthFactor", queueLengthFactor))
|
||||||
if Limit(queueLengthFactor) == Limit(coolOffSpeed) {
|
if Limit(queueLengthFactor) == Limit(coolOffSpeed) {
|
||||||
coolOff(realTimeSearchRate, realTimeQueryRate)
|
coolOff(realTimeSearchRate, realTimeQueryRate)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resultRateFactor := q.checkReadResultRate()
|
||||||
|
log.Debug("QuotaCenter checkReadResultRate done", zap.Float64("resultRateFactor", resultRateFactor))
|
||||||
|
if Limit(resultRateFactor) == Limit(coolOffSpeed) {
|
||||||
|
coolOff(realTimeSearchRate, realTimeQueryRate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,15 +448,18 @@ func (q *QuotaCenter) timeTickDelay() (float64, error) {
|
||||||
// checkNQInQuery checks search&query nq in QueryNode,
|
// checkNQInQuery checks search&query nq in QueryNode,
|
||||||
// and return the factor according to NQInQueueThreshold.
|
// and return the factor according to NQInQueueThreshold.
|
||||||
func (q *QuotaCenter) checkNQInQuery() float64 {
|
func (q *QuotaCenter) checkNQInQuery() float64 {
|
||||||
|
if !Params.QuotaConfig.QueueProtectionEnabled {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
|
sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
|
||||||
return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
|
return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
|
||||||
}
|
}
|
||||||
|
|
||||||
factor := float64(1)
|
|
||||||
nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold
|
nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold
|
||||||
if nqInQueueThreshold < 0 {
|
if nqInQueueThreshold < 0 {
|
||||||
// < 0 means disable queue length protection
|
// < 0 means disable queue length protection
|
||||||
return factor
|
return 1
|
||||||
}
|
}
|
||||||
for _, metric := range q.queryNodeMetrics {
|
for _, metric := range q.queryNodeMetrics {
|
||||||
searchNQSum := sum(metric.SearchQueue)
|
searchNQSum := sum(metric.SearchQueue)
|
||||||
|
@ -461,17 +469,20 @@ func (q *QuotaCenter) checkNQInQuery() float64 {
|
||||||
return Params.QuotaConfig.CoolOffSpeed
|
return Params.QuotaConfig.CoolOffSpeed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return factor
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkQueryLatency checks queueing latency in QueryNode for search&query requests,
|
// checkQueryLatency checks queueing latency in QueryNode for search&query requests,
|
||||||
// and return the factor according to QueueLatencyThreshold.
|
// and return the factor according to QueueLatencyThreshold.
|
||||||
func (q *QuotaCenter) checkQueryLatency() float64 {
|
func (q *QuotaCenter) checkQueryLatency() float64 {
|
||||||
factor := float64(1)
|
if !Params.QuotaConfig.QueueProtectionEnabled {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold
|
queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold
|
||||||
if queueLatencyThreshold < 0 {
|
if queueLatencyThreshold < 0 {
|
||||||
// < 0 means disable queue latency protection
|
// < 0 means disable queue latency protection
|
||||||
return factor
|
return 1
|
||||||
}
|
}
|
||||||
for _, metric := range q.queryNodeMetrics {
|
for _, metric := range q.queryNodeMetrics {
|
||||||
searchLatency := metric.SearchQueue.AvgQueueDuration
|
searchLatency := metric.SearchQueue.AvgQueueDuration
|
||||||
|
@ -480,7 +491,29 @@ func (q *QuotaCenter) checkQueryLatency() float64 {
|
||||||
return Params.QuotaConfig.CoolOffSpeed
|
return Params.QuotaConfig.CoolOffSpeed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return factor
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkReadResultRate checks search result rate in Proxy,
|
||||||
|
// and return the factor according to MaxReadResultRate.
|
||||||
|
func (q *QuotaCenter) checkReadResultRate() float64 {
|
||||||
|
if !Params.QuotaConfig.ResultProtectionEnabled {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
maxRate := Params.QuotaConfig.MaxReadResultRate
|
||||||
|
rateCount := float64(0)
|
||||||
|
for _, metric := range q.proxyMetrics {
|
||||||
|
for _, rm := range metric.Rms {
|
||||||
|
if rm.Label == metricsinfo.ReadResultThroughput {
|
||||||
|
rateCount += rm.Rate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if rateCount >= maxRate {
|
||||||
|
return Params.QuotaConfig.CoolOffSpeed
|
||||||
|
}
|
||||||
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// memoryToWaterLevel checks whether any node has memory resource issue,
|
// memoryToWaterLevel checks whether any node has memory resource issue,
|
||||||
|
|
|
@ -244,8 +244,33 @@ func TestQuotaCenter(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
factor = quotaCenter.checkQueryLatency()
|
factor = quotaCenter.checkQueryLatency()
|
||||||
assert.Equal(t, 1.0, factor)
|
assert.Equal(t, 1.0, factor)
|
||||||
//ok := math.Abs(factor-1.0) < 0.0001
|
})
|
||||||
//assert.True(t, ok)
|
|
||||||
|
t.Run("test checkReadResult", func(t *testing.T) {
|
||||||
|
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
|
||||||
|
factor := quotaCenter.checkReadResultRate()
|
||||||
|
assert.Equal(t, float64(1), factor)
|
||||||
|
|
||||||
|
// test cool off
|
||||||
|
Params.QuotaConfig.ResultProtectionEnabled = true
|
||||||
|
Params.QuotaConfig.MaxReadResultRate = 1
|
||||||
|
|
||||||
|
quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{
|
||||||
|
Rms: []metricsinfo.RateMetric{
|
||||||
|
{Label: metricsinfo.ReadResultThroughput, Rate: 1.2},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
factor = quotaCenter.checkReadResultRate()
|
||||||
|
assert.Equal(t, Params.QuotaConfig.CoolOffSpeed, factor)
|
||||||
|
|
||||||
|
// test no cool off
|
||||||
|
quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{
|
||||||
|
Rms: []metricsinfo.RateMetric{
|
||||||
|
{Label: metricsinfo.ReadResultThroughput, Rate: 0.8},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
factor = quotaCenter.checkReadResultRate()
|
||||||
|
assert.Equal(t, 1.0, factor)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test calculateReadRates", func(t *testing.T) {
|
t.Run("test calculateReadRates", func(t *testing.T) {
|
||||||
|
@ -278,6 +303,20 @@ func TestQuotaCenter(t *testing.T) {
|
||||||
quotaCenter.calculateReadRates()
|
quotaCenter.calculateReadRates()
|
||||||
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch])
|
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch])
|
||||||
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
|
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
|
||||||
|
|
||||||
|
Params.QuotaConfig.ResultProtectionEnabled = true
|
||||||
|
Params.QuotaConfig.MaxReadResultRate = 1
|
||||||
|
quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{
|
||||||
|
Rms: []metricsinfo.RateMetric{
|
||||||
|
{Label: internalpb.RateType_DQLSearch.String(), Rate: 100},
|
||||||
|
{Label: internalpb.RateType_DQLQuery.String(), Rate: 100},
|
||||||
|
{Label: metricsinfo.ReadResultThroughput, Rate: 1.2},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{SearchQueue: metricsinfo.ReadInfoInQueue{}}}
|
||||||
|
quotaCenter.calculateReadRates()
|
||||||
|
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch])
|
||||||
|
assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test calculateWriteRates", func(t *testing.T) {
|
t.Run("test calculateWriteRates", func(t *testing.T) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ type RateMetricLabel = string
|
||||||
const (
|
const (
|
||||||
NQPerSecond RateMetricLabel = "NQPerSecond"
|
NQPerSecond RateMetricLabel = "NQPerSecond"
|
||||||
SearchThroughput RateMetricLabel = "SearchThroughput"
|
SearchThroughput RateMetricLabel = "SearchThroughput"
|
||||||
|
ReadResultThroughput RateMetricLabel = "ReadResultThroughput"
|
||||||
InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput"
|
InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput"
|
||||||
DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput"
|
DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput"
|
||||||
)
|
)
|
||||||
|
|
|
@ -29,10 +29,10 @@ import (
|
||||||
const (
|
const (
|
||||||
// defaultMax is the default unlimited rate or threshold.
|
// defaultMax is the default unlimited rate or threshold.
|
||||||
defaultMax = float64(math.MaxFloat64)
|
defaultMax = float64(math.MaxFloat64)
|
||||||
// GB used to convert gigabytes and bytes.
|
// GBSize used to convert gigabytes and bytes.
|
||||||
GB = 1024.0 * 1024.0 * 1024.0
|
GBSize = 1024.0 * 1024.0 * 1024.0
|
||||||
// defaultDiskQuotaInGB is the default disk quota in gigabytes.
|
// defaultDiskQuotaInGB is the default disk quota in gigabytes.
|
||||||
defaultDiskQuotaInGB = defaultMax / GB
|
defaultDiskQuotaInGB = defaultMax / GBSize
|
||||||
// defaultMin is the default minimal rate.
|
// defaultMin is the default minimal rate.
|
||||||
defaultMin = float64(0)
|
defaultMin = float64(0)
|
||||||
// defaultLowWaterLevel is the default memory low water level.
|
// defaultLowWaterLevel is the default memory low water level.
|
||||||
|
@ -93,11 +93,13 @@ type quotaConfig struct {
|
||||||
DiskQuota float64
|
DiskQuota float64
|
||||||
|
|
||||||
// limit reading
|
// limit reading
|
||||||
ForceDenyReading bool
|
ForceDenyReading bool
|
||||||
QueueProtectionEnabled bool
|
QueueProtectionEnabled bool
|
||||||
NQInQueueThreshold int64
|
NQInQueueThreshold int64
|
||||||
QueueLatencyThreshold float64
|
QueueLatencyThreshold float64
|
||||||
CoolOffSpeed float64
|
ResultProtectionEnabled bool
|
||||||
|
MaxReadResultRate float64
|
||||||
|
CoolOffSpeed float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) init(base *BaseTable) {
|
func (p *quotaConfig) init(base *BaseTable) {
|
||||||
|
@ -154,6 +156,8 @@ func (p *quotaConfig) init(base *BaseTable) {
|
||||||
p.initQueueProtectionEnabled()
|
p.initQueueProtectionEnabled()
|
||||||
p.initNQInQueueThreshold()
|
p.initNQInQueueThreshold()
|
||||||
p.initQueueLatencyThreshold()
|
p.initQueueLatencyThreshold()
|
||||||
|
p.initResultProtectionEnabled()
|
||||||
|
p.initMaxReadResultRate()
|
||||||
p.initCoolOffSpeed()
|
p.initCoolOffSpeed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -520,7 +524,7 @@ func (p *quotaConfig) initDiskQuota() {
|
||||||
p.DiskQuota = defaultDiskQuotaInGB
|
p.DiskQuota = defaultDiskQuotaInGB
|
||||||
}
|
}
|
||||||
// gigabytes to bytes
|
// gigabytes to bytes
|
||||||
p.DiskQuota = p.DiskQuota * GB
|
p.DiskQuota = p.DiskQuota * GBSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) initForceDenyReading() {
|
func (p *quotaConfig) initForceDenyReading() {
|
||||||
|
@ -553,13 +557,32 @@ func (p *quotaConfig) initQueueLatencyThreshold() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *quotaConfig) initResultProtectionEnabled() {
|
||||||
|
p.ResultProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitReading.resultProtection.enabled", false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *quotaConfig) initMaxReadResultRate() {
|
||||||
|
if !p.ResultProtectionEnabled {
|
||||||
|
p.MaxReadResultRate = defaultMax
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.MaxReadResultRate = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.resultProtection.maxReadResultRate", defaultMax)
|
||||||
|
if math.Abs(p.MaxReadResultRate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||||
|
p.MaxReadResultRate = megaBytesRate2Bytes(p.MaxReadResultRate)
|
||||||
|
}
|
||||||
|
// [0, inf)
|
||||||
|
if p.MaxReadResultRate < 0 {
|
||||||
|
p.MaxReadResultRate = defaultMax
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) initCoolOffSpeed() {
|
func (p *quotaConfig) initCoolOffSpeed() {
|
||||||
const defaultSpeed = 0.9
|
const defaultSpeed = 0.9
|
||||||
p.CoolOffSpeed = defaultSpeed
|
p.CoolOffSpeed = defaultSpeed
|
||||||
if !p.QueueProtectionEnabled {
|
if !p.QueueProtectionEnabled {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.coolOffSpeed", defaultSpeed)
|
p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed)
|
||||||
// (0, 1]
|
// (0, 1]
|
||||||
if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 {
|
if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 {
|
||||||
log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed))
|
log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed))
|
||||||
|
|
|
@ -86,6 +86,8 @@ func TestQuotaParam(t *testing.T) {
|
||||||
assert.Equal(t, false, qc.QueueProtectionEnabled)
|
assert.Equal(t, false, qc.QueueProtectionEnabled)
|
||||||
assert.Equal(t, int64(0), qc.NQInQueueThreshold)
|
assert.Equal(t, int64(0), qc.NQInQueueThreshold)
|
||||||
assert.Equal(t, float64(0), qc.QueueLatencyThreshold)
|
assert.Equal(t, float64(0), qc.QueueLatencyThreshold)
|
||||||
|
assert.Equal(t, false, qc.ResultProtectionEnabled)
|
||||||
|
assert.Equal(t, defaultMax, qc.MaxReadResultRate)
|
||||||
assert.Equal(t, 0.9, qc.CoolOffSpeed)
|
assert.Equal(t, 0.9, qc.CoolOffSpeed)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue