From f1e330a9970f464b87b139b1c667467cd4450780 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 13 Oct 2022 14:57:24 +0800 Subject: [PATCH] Add read result rate protection (#19728) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- configs/milvus.yaml | 39 ++++++++------- internal/proxy/impl.go | 2 + internal/proxy/metrics_info.go | 15 +++--- internal/proxy/proxy.go | 1 + internal/rootcoord/quota_center.go | 51 ++++++++++++++++---- internal/rootcoord/quota_center_test.go | 43 ++++++++++++++++- internal/util/metricsinfo/quota_metric.go | 1 + internal/util/paramtable/quota_param.go | 43 +++++++++++++---- internal/util/paramtable/quota_param_test.go | 2 + 9 files changed, 152 insertions(+), 45 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8830a8e871..356c2d66d9 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -373,6 +373,7 @@ common: # 1. DML throughput limitation; # 2. DDL, DQL qps/rps limitation; # 3. DQL Queue length/latency protection; +# 4. DQL result rate protection; # If necessary, you can also manually force to deny RW requests. quotaAndLimits: enabled: false # `true` to enable quota and limits, `false` to disable. @@ -383,38 +384,38 @@ quotaAndLimits: ddl: # ddl limit rates, default no limit. enabled: false - collectionRate: # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection - partitionRate: # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition + collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection + partitionRate: -1 # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition indexRate: enabled: false - max: # qps, default no limit, rate for CreateIndex, DropIndex + max: -1 # qps, default no limit, rate for CreateIndex, DropIndex flushRate: enabled: false - max: # qps, default no limit, rate for flush + max: -1 # qps, default no limit, rate for flush compactionRate: enabled: false - max: # qps, default no limit, rate for manualCompaction + max: -1 # qps, default no limit, rate for manualCompaction # dml limit rates, default no limit. # The maximum rate will not be greater than `max`. dml: enabled: false insertRate: - max: # MB/s, default no limit + max: -1 # MB/s, default no limit deleteRate: - max: # MB/s, default no limit + max: -1 # MB/s, default no limit bulkLoadRate: # not support yet. TODO: limit bulkLoad rate - max: # MB/s, default no limit + max: -1 # MB/s, default no limit # dql limit rates, default no limit. # The maximum rate will not be greater than `max`. dql: enabled: false searchRate: - max: # vps (vectors per second), default no limit + max: -1 # vps (vectors per second), default no limit queryRate: - max: # qps, default no limit + max: -1 # qps, default no limit # limitWriting decides whether dml requests are allowed. limitWriting: @@ -440,29 +441,33 @@ quotaAndLimits: diskProtection: # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected; enabled: true - diskQuota: -1 # GB, (0, +inf), -1 means use default +inf + diskQuota: -1 # GB, (0, +inf), default no limit # limitReading decides whether dql requests are allowed. limitReading: # forceDeny `false` means dql requests are allowed (except for some # specific conditions, such as collection has been dropped), `true` means always reject all dql requests. forceDeny: false - queueProtection: enabled: false # nqInQueueThreshold indicated that the system was under backpressure for Search/Query path. # If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off # until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1. - nqInQueueThreshold: # int, default no limit + nqInQueueThreshold: -1 # int, default no limit # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off # until the latency of queuing no longer exceeds queueLatencyThreshold. # The latency here refers to the averaged latency over a period of time. - queueLatencyThreshold: # milliseconds, default no limit - - # coolOffSpeed is the speed of search&query rates cool off. - coolOffSpeed: 0.9 # (0, 1] + queueLatencyThreshold: -1 # milliseconds, default no limit + resultProtection: + enabled: false + # maxReadResultRate indicated that the system was under backpressure for Search/Query path. + # If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off + # until the read result rate no longer exceeds maxReadResultRate. + maxReadResultRate: -1 # MB/s, default no limit + # coolOffSpeed is the speed of search&query rates cool off. + coolOffSpeed: 0.9 # (0, 1] # AutoIndexConfig autoIndex: diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c9a5ec60ec..92b136c5e8 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2850,6 +2850,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) if qt.result != nil { sentSize := proto.Size(qt.result) metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize)) + rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize)) } return qt.result, nil } @@ -3068,6 +3069,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* FieldsData: qt.result.FieldsData, } sentSize := proto.Size(qt.result) + rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize)) metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize)) return ret, nil } diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index 8fb39340bd..bd2f5cc288 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -35,21 +35,22 @@ type showConfigurationsFuncType func(ctx context.Context, request *internalpb.Sh func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) { var err error rms := make([]metricsinfo.RateMetric, 0) - getRateMetric := func(rateType internalpb.RateType) { - rate, err2 := rateCol.Rate(rateType.String(), ratelimitutil.DefaultAvgDuration) + getRateMetric := func(label string) { + rate, err2 := rateCol.Rate(label, ratelimitutil.DefaultAvgDuration) if err2 != nil { err = err2 return } rms = append(rms, metricsinfo.RateMetric{ - Label: rateType.String(), + Label: label, Rate: rate, }) } - getRateMetric(internalpb.RateType_DMLInsert) - getRateMetric(internalpb.RateType_DMLDelete) - getRateMetric(internalpb.RateType_DQLSearch) - getRateMetric(internalpb.RateType_DQLQuery) + getRateMetric(internalpb.RateType_DMLInsert.String()) + getRateMetric(internalpb.RateType_DMLDelete.String()) + getRateMetric(internalpb.RateType_DQLSearch.String()) + getRateMetric(internalpb.RateType_DQLQuery.String()) + getRateMetric(metricsinfo.ReadResultThroughput) if err != nil { return nil, err } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 139d5fb97a..2e4aa1b06e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -169,6 +169,7 @@ func (node *Proxy) initRateCollector() error { // TODO: add bulkLoad rate rateCol.Register(internalpb.RateType_DQLSearch.String()) rateCol.Register(internalpb.RateType_DQLQuery.String()) + rateCol.Register(metricsinfo.ReadResultThroughput) return nil } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index af910b2dec..d6c854e34c 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -77,6 +77,7 @@ type Limit = ratelimitutil.Limit // 3. Disk quota protection -> force deny writing if exceeded // 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed // 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed +// 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed // If necessary, user can also manually force to deny RW requests. type QuotaCenter struct { // clients @@ -285,9 +286,6 @@ func (q *QuotaCenter) calculateReadRates() { q.forceDenyReading(ManualForceDeny) return } - if !Params.QuotaConfig.QueueProtectionEnabled { - return - } coolOffSpeed := Params.QuotaConfig.CoolOffSpeed coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) { @@ -316,6 +314,13 @@ func (q *QuotaCenter) calculateReadRates() { log.Debug("QuotaCenter checkNQInQuery done", zap.Float64("queueLengthFactor", queueLengthFactor)) if Limit(queueLengthFactor) == Limit(coolOffSpeed) { coolOff(realTimeSearchRate, realTimeQueryRate) + return + } + + resultRateFactor := q.checkReadResultRate() + log.Debug("QuotaCenter checkReadResultRate done", zap.Float64("resultRateFactor", resultRateFactor)) + if Limit(resultRateFactor) == Limit(coolOffSpeed) { + coolOff(realTimeSearchRate, realTimeQueryRate) } } @@ -443,15 +448,18 @@ func (q *QuotaCenter) timeTickDelay() (float64, error) { // checkNQInQuery checks search&query nq in QueryNode, // and return the factor according to NQInQueueThreshold. func (q *QuotaCenter) checkNQInQuery() float64 { + if !Params.QuotaConfig.QueueProtectionEnabled { + return 1 + } + sum := func(ri metricsinfo.ReadInfoInQueue) int64 { return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan } - factor := float64(1) nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold if nqInQueueThreshold < 0 { // < 0 means disable queue length protection - return factor + return 1 } for _, metric := range q.queryNodeMetrics { searchNQSum := sum(metric.SearchQueue) @@ -461,17 +469,20 @@ func (q *QuotaCenter) checkNQInQuery() float64 { return Params.QuotaConfig.CoolOffSpeed } } - return factor + return 1 } // checkQueryLatency checks queueing latency in QueryNode for search&query requests, // and return the factor according to QueueLatencyThreshold. func (q *QuotaCenter) checkQueryLatency() float64 { - factor := float64(1) + if !Params.QuotaConfig.QueueProtectionEnabled { + return 1 + } + queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold if queueLatencyThreshold < 0 { // < 0 means disable queue latency protection - return factor + return 1 } for _, metric := range q.queryNodeMetrics { searchLatency := metric.SearchQueue.AvgQueueDuration @@ -480,7 +491,29 @@ func (q *QuotaCenter) checkQueryLatency() float64 { return Params.QuotaConfig.CoolOffSpeed } } - return factor + return 1 +} + +// checkReadResultRate checks search result rate in Proxy, +// and return the factor according to MaxReadResultRate. +func (q *QuotaCenter) checkReadResultRate() float64 { + if !Params.QuotaConfig.ResultProtectionEnabled { + return 1 + } + + maxRate := Params.QuotaConfig.MaxReadResultRate + rateCount := float64(0) + for _, metric := range q.proxyMetrics { + for _, rm := range metric.Rms { + if rm.Label == metricsinfo.ReadResultThroughput { + rateCount += rm.Rate + } + } + } + if rateCount >= maxRate { + return Params.QuotaConfig.CoolOffSpeed + } + return 1 } // memoryToWaterLevel checks whether any node has memory resource issue, diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 8f672431e8..bd9c4b7063 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -244,8 +244,33 @@ func TestQuotaCenter(t *testing.T) { }} factor = quotaCenter.checkQueryLatency() assert.Equal(t, 1.0, factor) - //ok := math.Abs(factor-1.0) < 0.0001 - //assert.True(t, ok) + }) + + t.Run("test checkReadResult", func(t *testing.T) { + quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) + factor := quotaCenter.checkReadResultRate() + assert.Equal(t, float64(1), factor) + + // test cool off + Params.QuotaConfig.ResultProtectionEnabled = true + Params.QuotaConfig.MaxReadResultRate = 1 + + quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ + Rms: []metricsinfo.RateMetric{ + {Label: metricsinfo.ReadResultThroughput, Rate: 1.2}, + }, + }} + factor = quotaCenter.checkReadResultRate() + assert.Equal(t, Params.QuotaConfig.CoolOffSpeed, factor) + + // test no cool off + quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ + Rms: []metricsinfo.RateMetric{ + {Label: metricsinfo.ReadResultThroughput, Rate: 0.8}, + }, + }} + factor = quotaCenter.checkReadResultRate() + assert.Equal(t, 1.0, factor) }) t.Run("test calculateReadRates", func(t *testing.T) { @@ -278,6 +303,20 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) + + Params.QuotaConfig.ResultProtectionEnabled = true + Params.QuotaConfig.MaxReadResultRate = 1 + quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ + Rms: []metricsinfo.RateMetric{ + {Label: internalpb.RateType_DQLSearch.String(), Rate: 100}, + {Label: internalpb.RateType_DQLQuery.String(), Rate: 100}, + {Label: metricsinfo.ReadResultThroughput, Rate: 1.2}, + }, + }} + quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{SearchQueue: metricsinfo.ReadInfoInQueue{}}} + quotaCenter.calculateReadRates() + assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) + assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) }) t.Run("test calculateWriteRates", func(t *testing.T) { diff --git a/internal/util/metricsinfo/quota_metric.go b/internal/util/metricsinfo/quota_metric.go index 320450b1f1..dd5a3fe70e 100644 --- a/internal/util/metricsinfo/quota_metric.go +++ b/internal/util/metricsinfo/quota_metric.go @@ -28,6 +28,7 @@ type RateMetricLabel = string const ( NQPerSecond RateMetricLabel = "NQPerSecond" SearchThroughput RateMetricLabel = "SearchThroughput" + ReadResultThroughput RateMetricLabel = "ReadResultThroughput" InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput" DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput" ) diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index cc91a8a8b5..6b76301d44 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -29,10 +29,10 @@ import ( const ( // defaultMax is the default unlimited rate or threshold. defaultMax = float64(math.MaxFloat64) - // GB used to convert gigabytes and bytes. - GB = 1024.0 * 1024.0 * 1024.0 + // GBSize used to convert gigabytes and bytes. + GBSize = 1024.0 * 1024.0 * 1024.0 // defaultDiskQuotaInGB is the default disk quota in gigabytes. - defaultDiskQuotaInGB = defaultMax / GB + defaultDiskQuotaInGB = defaultMax / GBSize // defaultMin is the default minimal rate. defaultMin = float64(0) // defaultLowWaterLevel is the default memory low water level. @@ -93,11 +93,13 @@ type quotaConfig struct { DiskQuota float64 // limit reading - ForceDenyReading bool - QueueProtectionEnabled bool - NQInQueueThreshold int64 - QueueLatencyThreshold float64 - CoolOffSpeed float64 + ForceDenyReading bool + QueueProtectionEnabled bool + NQInQueueThreshold int64 + QueueLatencyThreshold float64 + ResultProtectionEnabled bool + MaxReadResultRate float64 + CoolOffSpeed float64 } func (p *quotaConfig) init(base *BaseTable) { @@ -154,6 +156,8 @@ func (p *quotaConfig) init(base *BaseTable) { p.initQueueProtectionEnabled() p.initNQInQueueThreshold() p.initQueueLatencyThreshold() + p.initResultProtectionEnabled() + p.initMaxReadResultRate() p.initCoolOffSpeed() } @@ -520,7 +524,7 @@ func (p *quotaConfig) initDiskQuota() { p.DiskQuota = defaultDiskQuotaInGB } // gigabytes to bytes - p.DiskQuota = p.DiskQuota * GB + p.DiskQuota = p.DiskQuota * GBSize } func (p *quotaConfig) initForceDenyReading() { @@ -553,13 +557,32 @@ func (p *quotaConfig) initQueueLatencyThreshold() { } } +func (p *quotaConfig) initResultProtectionEnabled() { + p.ResultProtectionEnabled = p.Base.ParseBool("quotaAndLimits.limitReading.resultProtection.enabled", false) +} + +func (p *quotaConfig) initMaxReadResultRate() { + if !p.ResultProtectionEnabled { + p.MaxReadResultRate = defaultMax + return + } + p.MaxReadResultRate = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.resultProtection.maxReadResultRate", defaultMax) + if math.Abs(p.MaxReadResultRate-defaultMax) > 0.001 { // maxRate != defaultMax + p.MaxReadResultRate = megaBytesRate2Bytes(p.MaxReadResultRate) + } + // [0, inf) + if p.MaxReadResultRate < 0 { + p.MaxReadResultRate = defaultMax + } +} + func (p *quotaConfig) initCoolOffSpeed() { const defaultSpeed = 0.9 p.CoolOffSpeed = defaultSpeed if !p.QueueProtectionEnabled { return } - p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.coolOffSpeed", defaultSpeed) + p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed) // (0, 1] if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 { log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed)) diff --git a/internal/util/paramtable/quota_param_test.go b/internal/util/paramtable/quota_param_test.go index 0180c2c63a..d765b33638 100644 --- a/internal/util/paramtable/quota_param_test.go +++ b/internal/util/paramtable/quota_param_test.go @@ -86,6 +86,8 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, false, qc.QueueProtectionEnabled) assert.Equal(t, int64(0), qc.NQInQueueThreshold) assert.Equal(t, float64(0), qc.QueueLatencyThreshold) + assert.Equal(t, false, qc.ResultProtectionEnabled) + assert.Equal(t, defaultMax, qc.MaxReadResultRate) assert.Equal(t, 0.9, qc.CoolOffSpeed) }) }