mirror of https://github.com/milvus-io/milvus.git
				
				
				
			enhance: remove cooling off in rate limiter for read requests (#35935)
issue: #35934 Signed-off-by: jaime <yun.zhang@zilliz.com>pull/35974/head
							parent
							
								
									919e96ac22
								
							
						
					
					
						commit
						24fb10114b
					
				| 
						 | 
				
			
			@ -983,31 +983,6 @@ quotaAndLimits:
 | 
			
		|||
    # forceDeny false means dql requests are allowed (except for some
 | 
			
		||||
    # specific conditions, such as collection has been dropped), true means always reject all dql requests.
 | 
			
		||||
    forceDeny: false
 | 
			
		||||
    queueProtection:
 | 
			
		||||
      enabled: false
 | 
			
		||||
      # nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
      # If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
 | 
			
		||||
      # until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
 | 
			
		||||
      # int, default no limit
 | 
			
		||||
      nqInQueueThreshold: -1
 | 
			
		||||
      # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
      # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
 | 
			
		||||
      # until the latency of queuing no longer exceeds queueLatencyThreshold.
 | 
			
		||||
      # The latency here refers to the averaged latency over a period of time.
 | 
			
		||||
      # milliseconds, default no limit
 | 
			
		||||
      queueLatencyThreshold: -1
 | 
			
		||||
    resultProtection:
 | 
			
		||||
      enabled: false
 | 
			
		||||
      # maxReadResultRate indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
      # If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
 | 
			
		||||
      # until the read result rate no longer exceeds maxReadResultRate.
 | 
			
		||||
      # MB/s, default no limit
 | 
			
		||||
      maxReadResultRate: -1
 | 
			
		||||
      maxReadResultRatePerDB: -1
 | 
			
		||||
      maxReadResultRatePerCollection: -1
 | 
			
		||||
    # colOffSpeed is the speed of search&query rates cool off.
 | 
			
		||||
    # (0, 1]
 | 
			
		||||
    coolOffSpeed: 0.9
 | 
			
		||||
 | 
			
		||||
trace:
 | 
			
		||||
  # trace exporter type, default is stdout,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2921,9 +2921,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
 | 
			
		|||
		request.GetCollectionName(),
 | 
			
		||||
	).Add(float64(request.GetNq()))
 | 
			
		||||
 | 
			
		||||
	subLabel := GetCollectionRateSubLabel(request)
 | 
			
		||||
	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()), subLabel)
 | 
			
		||||
 | 
			
		||||
	if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
 | 
			
		||||
		return &milvuspb.SearchResults{
 | 
			
		||||
			Status: merr.Status(err),
 | 
			
		||||
| 
						 | 
				
			
			@ -3100,7 +3097,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
 | 
			
		||||
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
 | 
			
		||||
	}
 | 
			
		||||
	return qt.result, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -3131,13 +3127,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
 | 
			
		|||
		request.GetCollectionName(),
 | 
			
		||||
	).Add(float64(receiveSize))
 | 
			
		||||
 | 
			
		||||
	subLabel := GetCollectionRateSubLabel(request)
 | 
			
		||||
	allNQ := int64(0)
 | 
			
		||||
	for _, searchRequest := range request.Requests {
 | 
			
		||||
		allNQ += searchRequest.GetNq()
 | 
			
		||||
	}
 | 
			
		||||
	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(allNQ), subLabel)
 | 
			
		||||
 | 
			
		||||
	if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
 | 
			
		||||
		return &milvuspb.SearchResults{
 | 
			
		||||
			Status: merr.Status(err),
 | 
			
		||||
| 
						 | 
				
			
			@ -3297,7 +3286,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
 | 
			
		||||
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
 | 
			
		||||
	}
 | 
			
		||||
	return qt.result, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -3612,7 +3600,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
 | 
			
		|||
	).Inc()
 | 
			
		||||
 | 
			
		||||
	sentSize := proto.Size(qt.result)
 | 
			
		||||
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
 | 
			
		||||
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
 | 
			
		||||
 | 
			
		||||
	username := GetCurUserFromContextOrDefault(ctx)
 | 
			
		||||
| 
						 | 
				
			
			@ -6438,5 +6425,4 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
 | 
			
		|||
func DeregisterSubLabel(subLabel string) {
 | 
			
		||||
	rateCol.DeregisterSubLabel(internalpb.RateType_DQLQuery.String(), subLabel)
 | 
			
		||||
	rateCol.DeregisterSubLabel(internalpb.RateType_DQLSearch.String(), subLabel)
 | 
			
		||||
	rateCol.DeregisterSubLabel(metricsinfo.ReadResultThroughput, subLabel)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -71,8 +71,6 @@ func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) {
 | 
			
		|||
	getSubLabelRateMetric(internalpb.RateType_DQLSearch.String())
 | 
			
		||||
	getRateMetric(internalpb.RateType_DQLQuery.String())
 | 
			
		||||
	getSubLabelRateMetric(internalpb.RateType_DQLQuery.String())
 | 
			
		||||
	getRateMetric(metricsinfo.ReadResultThroughput)
 | 
			
		||||
	getSubLabelRateMetric(metricsinfo.ReadResultThroughput)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -207,7 +207,6 @@ func (node *Proxy) initRateCollector() error {
 | 
			
		|||
	// TODO: add bulkLoad rate
 | 
			
		||||
	rateCol.Register(internalpb.RateType_DQLSearch.String())
 | 
			
		||||
	rateCol.Register(internalpb.RateType_DQLQuery.String())
 | 
			
		||||
	rateCol.Register(metricsinfo.ReadResultThroughput)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -32,31 +32,11 @@ var Counter *counter
 | 
			
		|||
 | 
			
		||||
func RateMetrics() []string {
 | 
			
		||||
	return []string{
 | 
			
		||||
		metricsinfo.NQPerSecond,
 | 
			
		||||
		metricsinfo.SearchThroughput,
 | 
			
		||||
		metricsinfo.InsertConsumeThroughput,
 | 
			
		||||
		metricsinfo.DeleteConsumeThroughput,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func AverageMetrics() []string {
 | 
			
		||||
	return []string{
 | 
			
		||||
		metricsinfo.QueryQueueMetric,
 | 
			
		||||
		metricsinfo.SearchQueueMetric,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ConstructLabel(subs ...string) string {
 | 
			
		||||
	label := ""
 | 
			
		||||
	for id, sub := range subs {
 | 
			
		||||
		label += sub
 | 
			
		||||
		if id != len(subs)-1 {
 | 
			
		||||
			label += "-"
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return label
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	var err error
 | 
			
		||||
	Rate, err = ratelimitutil.NewRateCollector(ratelimitutil.DefaultWindow, ratelimitutil.DefaultGranularity, false)
 | 
			
		||||
| 
						 | 
				
			
			@ -70,9 +50,4 @@ func init() {
 | 
			
		|||
	for _, label := range RateMetrics() {
 | 
			
		||||
		Rate.Register(label)
 | 
			
		||||
	}
 | 
			
		||||
	// init average metric
 | 
			
		||||
 | 
			
		||||
	for _, label := range AverageMetrics() {
 | 
			
		||||
		Average.Register(label)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,7 +19,6 @@ package querynodev2
 | 
			
		|||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/samber/lo"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -51,40 +50,6 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) {
 | 
			
		|||
	return rms, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getSearchNQInQueue() (metricsinfo.ReadInfoInQueue, error) {
 | 
			
		||||
	average, err := collector.Average.Average(metricsinfo.SearchQueueMetric)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return metricsinfo.ReadInfoInQueue{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer collector.Average.Reset(metricsinfo.SearchQueueMetric)
 | 
			
		||||
 | 
			
		||||
	readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.SearchQueueMetric)
 | 
			
		||||
	executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.SearchQueueMetric)
 | 
			
		||||
 | 
			
		||||
	return metricsinfo.ReadInfoInQueue{
 | 
			
		||||
		ReadyQueue:       collector.Counter.Get(readyQueueLabel),
 | 
			
		||||
		ExecuteChan:      collector.Counter.Get(executeQueueLabel),
 | 
			
		||||
		AvgQueueDuration: time.Duration(int64(average)),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getQueryTasksInQueue() (metricsinfo.ReadInfoInQueue, error) {
 | 
			
		||||
	average, err := collector.Average.Average(metricsinfo.QueryQueueMetric)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return metricsinfo.ReadInfoInQueue{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer collector.Average.Reset(metricsinfo.QueryQueueMetric)
 | 
			
		||||
 | 
			
		||||
	readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.QueryQueueMetric)
 | 
			
		||||
	executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.QueryQueueMetric)
 | 
			
		||||
 | 
			
		||||
	return metricsinfo.ReadInfoInQueue{
 | 
			
		||||
		ReadyQueue:       collector.Counter.Get(readyQueueLabel),
 | 
			
		||||
		ExecuteChan:      collector.Counter.Get(executeQueueLabel),
 | 
			
		||||
		AvgQueueDuration: time.Duration(int64(average)),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getQuotaMetrics returns QueryNodeQuotaMetrics.
 | 
			
		||||
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
 | 
			
		||||
	rms, err := getRateMetric()
 | 
			
		||||
| 
						 | 
				
			
			@ -92,16 +57,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
 | 
			
		|||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sqms, err := getSearchNQInQueue()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	qqms, err := getQueryTasksInQueue()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	minTsafeChannel, minTsafe := node.tSafeManager.Min()
 | 
			
		||||
	collections := node.manager.Collection.List()
 | 
			
		||||
	nodeID := fmt.Sprint(node.GetNodeID())
 | 
			
		||||
| 
						 | 
				
			
			@ -178,8 +133,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
 | 
			
		|||
			MinFlowGraphTt:      minTsafe,
 | 
			
		||||
			NumFlowGraph:        node.pipelineManager.Num(),
 | 
			
		||||
		},
 | 
			
		||||
		SearchQueue:         sqms,
 | 
			
		||||
		QueryQueue:          qqms,
 | 
			
		||||
		GrowingSegmentsSize: totalGrowingSize,
 | 
			
		||||
		Effect: metricsinfo.NodeEffect{
 | 
			
		||||
			NodeID:        node.GetNodeID(),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -38,7 +38,6 @@ import (
 | 
			
		|||
	"github.com/milvus-io/milvus/internal/proto/internalpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/querypb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/segcorepb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/collector"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/delegator"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/segments"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/tasks"
 | 
			
		||||
| 
						 | 
				
			
			@ -810,8 +809,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
 | 
			
		|||
		WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.ReduceShards, metrics.BatchReduce).
 | 
			
		||||
		Observe(float64(reduceLatency.Milliseconds()))
 | 
			
		||||
 | 
			
		||||
	collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
 | 
			
		||||
	collector.Rate.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
 | 
			
		||||
	metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.SearchLabel).
 | 
			
		||||
		Add(float64(proto.Size(req)))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -958,7 +955,6 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
 | 
			
		|||
		metrics.QueryLabel, metrics.ReduceShards, metrics.BatchReduce).
 | 
			
		||||
		Observe(float64(reduceLatency.Milliseconds()))
 | 
			
		||||
 | 
			
		||||
	collector.Rate.Add(metricsinfo.NQPerSecond, 1)
 | 
			
		||||
	metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
 | 
			
		||||
	relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 {
 | 
			
		||||
		return acc + result.GetCostAggregation().GetTotalRelatedDataSize()
 | 
			
		||||
| 
						 | 
				
			
			@ -1021,7 +1017,6 @@ func (node *QueryNode) QueryStream(req *querypb.QueryRequest, srv querypb.QueryN
 | 
			
		|||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	collector.Rate.Add(metricsinfo.NQPerSecond, 1)
 | 
			
		||||
	metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,11 +14,9 @@ import (
 | 
			
		|||
	"github.com/milvus-io/milvus/internal/proto/internalpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/querypb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/segcorepb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/collector"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/segments"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/metrics"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/merr"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/metricsinfo"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/paramtable"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/timerecord"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/typeutil"
 | 
			
		||||
| 
						 | 
				
			
			@ -87,8 +85,6 @@ func (t *QueryTask) PreExecute() error {
 | 
			
		|||
		username).
 | 
			
		||||
		Observe(inQueueDurationMS)
 | 
			
		||||
 | 
			
		||||
	// Update collector for query node quota.
 | 
			
		||||
	collector.Average.Add(metricsinfo.QueryQueueMetric, float64(inQueueDuration.Microseconds()))
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,13 +19,11 @@ import (
 | 
			
		|||
	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/internalpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/querypb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/collector"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/querynodev2/segments"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/log"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/metrics"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/funcutil"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/merr"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/metricsinfo"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/paramtable"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/timerecord"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/typeutil"
 | 
			
		||||
| 
						 | 
				
			
			@ -120,9 +118,6 @@ func (t *SearchTask) PreExecute() error {
 | 
			
		|||
		username).
 | 
			
		||||
		Observe(inQueueDurationMS)
 | 
			
		||||
 | 
			
		||||
	// Update collector for query node quota.
 | 
			
		||||
	collector.Average.Add(metricsinfo.SearchQueueMetric, float64(inQueueDuration.Microseconds()))
 | 
			
		||||
 | 
			
		||||
	// Execute merged task's PreExecute.
 | 
			
		||||
	for _, subTask := range t.others {
 | 
			
		||||
		err := subTask.PreExecute()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -685,236 +685,6 @@ func (q *QuotaCenter) getDenyReadingDBs() map[int64]struct{} {
 | 
			
		|||
	return dbIDs
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getReadRates get rate information of collections and databases from proxy metrics
 | 
			
		||||
func (q *QuotaCenter) getReadRates() (map[string]float64, map[string]map[string]map[string]float64) {
 | 
			
		||||
	// label metric
 | 
			
		||||
	metricMap := make(map[string]float64)
 | 
			
		||||
	// sub label metric, label -> db -> collection -> value
 | 
			
		||||
	collectionMetricMap := make(map[string]map[string]map[string]float64)
 | 
			
		||||
	for _, metric := range q.proxyMetrics {
 | 
			
		||||
		for _, rm := range metric.Rms {
 | 
			
		||||
			if !ratelimitutil.IsSubLabel(rm.Label) {
 | 
			
		||||
				metricMap[rm.Label] += rm.Rate
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			mainLabel, database, collection, ok := ratelimitutil.SplitCollectionSubLabel(rm.Label)
 | 
			
		||||
			if !ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			labelMetric, ok := collectionMetricMap[mainLabel]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				labelMetric = make(map[string]map[string]float64)
 | 
			
		||||
				collectionMetricMap[mainLabel] = labelMetric
 | 
			
		||||
			}
 | 
			
		||||
			databaseMetric, ok := labelMetric[database]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				databaseMetric = make(map[string]float64)
 | 
			
		||||
				labelMetric[database] = databaseMetric
 | 
			
		||||
			}
 | 
			
		||||
			databaseMetric[collection] += rm.Rate
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return metricMap, collectionMetricMap
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *QuotaCenter) getLimitedDBAndCollections(metricMap map[string]float64,
 | 
			
		||||
	collectionMetricMap map[string]map[string]map[string]float64,
 | 
			
		||||
) (bool, *typeutil.Set[string], *typeutil.Set[string]) {
 | 
			
		||||
	limitDBNameSet := typeutil.NewSet[string]()
 | 
			
		||||
	limitCollectionNameSet := typeutil.NewSet[string]()
 | 
			
		||||
	clusterLimit := false
 | 
			
		||||
 | 
			
		||||
	formatCollctionRateKey := func(dbName, collectionName string) string {
 | 
			
		||||
		return fmt.Sprintf("%s.%s", dbName, collectionName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	enableResultProtection := Params.QuotaConfig.ResultProtectionEnabled.GetAsBool()
 | 
			
		||||
	if enableResultProtection {
 | 
			
		||||
		maxRate := Params.QuotaConfig.MaxReadResultRate.GetAsFloat()
 | 
			
		||||
		maxDBRate := Params.QuotaConfig.MaxReadResultRatePerDB.GetAsFloat()
 | 
			
		||||
		maxCollectionRate := Params.QuotaConfig.MaxReadResultRatePerCollection.GetAsFloat()
 | 
			
		||||
 | 
			
		||||
		dbRateCount := make(map[string]float64)
 | 
			
		||||
		collectionRateCount := make(map[string]float64)
 | 
			
		||||
		rateCount := metricMap[metricsinfo.ReadResultThroughput]
 | 
			
		||||
		for mainLabel, labelMetric := range collectionMetricMap {
 | 
			
		||||
			if mainLabel != metricsinfo.ReadResultThroughput {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			for database, databaseMetric := range labelMetric {
 | 
			
		||||
				for collection, metricValue := range databaseMetric {
 | 
			
		||||
					dbRateCount[database] += metricValue
 | 
			
		||||
					collectionRateCount[formatCollctionRateKey(database, collection)] = metricValue
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if rateCount >= maxRate {
 | 
			
		||||
			clusterLimit = true
 | 
			
		||||
		}
 | 
			
		||||
		for s, f := range dbRateCount {
 | 
			
		||||
			if f >= maxDBRate {
 | 
			
		||||
				limitDBNameSet.Insert(s)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		for s, f := range collectionRateCount {
 | 
			
		||||
			if f >= maxCollectionRate {
 | 
			
		||||
				limitCollectionNameSet.Insert(s)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return clusterLimit, &limitDBNameSet, &limitCollectionNameSet
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *QuotaCenter) coolOffDatabaseReading(deniedDatabaseIDs map[int64]struct{}, limitDBNameSet *typeutil.Set[string],
 | 
			
		||||
	collectionMetricMap map[string]map[string]map[string]float64, log *log.MLogger,
 | 
			
		||||
) error {
 | 
			
		||||
	if limitDBNameSet.Len() > 0 {
 | 
			
		||||
		databaseSearchRate := make(map[string]float64)
 | 
			
		||||
		databaseQueryRate := make(map[string]float64)
 | 
			
		||||
		for mainLabel, labelMetric := range collectionMetricMap {
 | 
			
		||||
			var databaseRate map[string]float64
 | 
			
		||||
			if mainLabel == internalpb.RateType_DQLSearch.String() {
 | 
			
		||||
				databaseRate = databaseSearchRate
 | 
			
		||||
			} else if mainLabel == internalpb.RateType_DQLQuery.String() {
 | 
			
		||||
				databaseRate = databaseQueryRate
 | 
			
		||||
			} else {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			for database, databaseMetric := range labelMetric {
 | 
			
		||||
				for _, metricValue := range databaseMetric {
 | 
			
		||||
					databaseRate[database] += metricValue
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
 | 
			
		||||
		limitDBNameSet.Range(func(name string) bool {
 | 
			
		||||
			dbID, ok := q.dbs.Get(name)
 | 
			
		||||
			if !ok {
 | 
			
		||||
				log.Warn("db not found", zap.String("dbName", name))
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// skip this database because it has been denied access for reading
 | 
			
		||||
			_, ok = deniedDatabaseIDs[dbID]
 | 
			
		||||
			if ok {
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			dbLimiter := q.rateLimiter.GetDatabaseLimiters(dbID)
 | 
			
		||||
			if dbLimiter == nil {
 | 
			
		||||
				log.Warn("database limiter not found", zap.Int64("dbID", dbID))
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			realTimeSearchRate := databaseSearchRate[name]
 | 
			
		||||
			realTimeQueryRate := databaseQueryRate[name]
 | 
			
		||||
			q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, dbLimiter, log)
 | 
			
		||||
			return true
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *QuotaCenter) coolOffCollectionReading(deniedDatabaseIDs map[int64]struct{}, limitCollectionSet *typeutil.UniqueSet, limitCollectionNameSet *typeutil.Set[string],
 | 
			
		||||
	collectionMetricMap map[string]map[string]map[string]float64, log *log.MLogger,
 | 
			
		||||
) error {
 | 
			
		||||
	var updateLimitErr error
 | 
			
		||||
	coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
 | 
			
		||||
 | 
			
		||||
	splitCollctionRateKey := func(key string) (string, string) {
 | 
			
		||||
		parts := strings.Split(key, ".")
 | 
			
		||||
		return parts[0], parts[1]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dbIDs := make(map[int64]string, q.dbs.Len())
 | 
			
		||||
	collectionIDs := make(map[int64]string, q.collections.Len())
 | 
			
		||||
	q.dbs.Range(func(name string, id int64) bool {
 | 
			
		||||
		dbIDs[id] = name
 | 
			
		||||
		return true
 | 
			
		||||
	})
 | 
			
		||||
	q.collections.Range(func(name string, id int64) bool {
 | 
			
		||||
		_, collectionName := SplitCollectionKey(name)
 | 
			
		||||
		collectionIDs[id] = collectionName
 | 
			
		||||
		return true
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	limitCollectionNameSet.Range(func(name string) bool {
 | 
			
		||||
		dbName, collectionName := splitCollctionRateKey(name)
 | 
			
		||||
		dbID, ok := q.dbs.Get(dbName)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			log.Warn("db not found", zap.String("dbName", dbName))
 | 
			
		||||
			updateLimitErr = fmt.Errorf("db not found: %s", dbName)
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
		collectionID, ok := q.collections.Get(FormatCollectionKey(dbID, collectionName))
 | 
			
		||||
		if !ok {
 | 
			
		||||
			log.Warn("collection not found", zap.String("collectionName", name))
 | 
			
		||||
			updateLimitErr = fmt.Errorf("collection not found: %s", name)
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
		limitCollectionSet.Insert(collectionID)
 | 
			
		||||
		return true
 | 
			
		||||
	})
 | 
			
		||||
	if updateLimitErr != nil {
 | 
			
		||||
		return updateLimitErr
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	safeGetCollectionRate := func(label, dbName, collectionName string) float64 {
 | 
			
		||||
		if labelMetric, ok := collectionMetricMap[label]; ok {
 | 
			
		||||
			if dbMetric, ok := labelMetric[dbName]; ok {
 | 
			
		||||
				if rate, ok := dbMetric[collectionName]; ok {
 | 
			
		||||
					return rate
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return 0
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	coolOffCollectionID := func(collections ...int64) error {
 | 
			
		||||
		for _, collection := range collections {
 | 
			
		||||
			dbID, ok := q.collectionIDToDBID.Get(collection)
 | 
			
		||||
			if !ok {
 | 
			
		||||
				return fmt.Errorf("db ID not found of collection ID: %d", collection)
 | 
			
		||||
			}
 | 
			
		||||
			// skip this database because it has been denied access for reading
 | 
			
		||||
			_, ok = deniedDatabaseIDs[dbID]
 | 
			
		||||
			if ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection)
 | 
			
		||||
			if collectionLimiter == nil {
 | 
			
		||||
				return fmt.Errorf("collection limiter not found: %d", collection)
 | 
			
		||||
			}
 | 
			
		||||
			dbName, ok := dbIDs[dbID]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				return fmt.Errorf("db name not found of db ID: %d", dbID)
 | 
			
		||||
			}
 | 
			
		||||
			collectionName, ok := collectionIDs[collection]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				return fmt.Errorf("collection name not found of collection ID: %d", collection)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			realTimeSearchRate := safeGetCollectionRate(internalpb.RateType_DQLSearch.String(), dbName, collectionName)
 | 
			
		||||
			realTimeQueryRate := safeGetCollectionRate(internalpb.RateType_DQLQuery.String(), dbName, collectionName)
 | 
			
		||||
			q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, collectionLimiter, log)
 | 
			
		||||
 | 
			
		||||
			collectionProps := q.getCollectionLimitProperties(collection)
 | 
			
		||||
			q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey),
 | 
			
		||||
				internalpb.RateType_DQLSearch, collectionLimiter)
 | 
			
		||||
			q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey),
 | 
			
		||||
				internalpb.RateType_DQLQuery, collectionLimiter)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if updateLimitErr = coolOffCollectionID(limitCollectionSet.Collect()...); updateLimitErr != nil {
 | 
			
		||||
		return updateLimitErr
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// calculateReadRates calculates and sets dql rates.
 | 
			
		||||
func (q *QuotaCenter) calculateReadRates() error {
 | 
			
		||||
	log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
 | 
			
		||||
| 
						 | 
				
			
			@ -927,86 +697,9 @@ func (q *QuotaCenter) calculateReadRates() error {
 | 
			
		|||
	if len(deniedDatabaseIDs) != 0 {
 | 
			
		||||
		q.forceDenyReading(commonpb.ErrorCode_ForceDeny, false, maps.Keys(deniedDatabaseIDs), log)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second)
 | 
			
		||||
	limitCollectionSet := typeutil.NewUniqueSet()
 | 
			
		||||
 | 
			
		||||
	// enableQueueProtection && queueLatencyThreshold >= 0 means enable queue latency protection
 | 
			
		||||
	if queueLatencyThreshold >= 0 {
 | 
			
		||||
		for _, metric := range q.queryNodeMetrics {
 | 
			
		||||
			searchLatency := metric.SearchQueue.AvgQueueDuration
 | 
			
		||||
			queryLatency := metric.QueryQueue.AvgQueueDuration
 | 
			
		||||
			if searchLatency >= queueLatencyThreshold || queryLatency >= queueLatencyThreshold {
 | 
			
		||||
				limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// queue length
 | 
			
		||||
	enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool()
 | 
			
		||||
	nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold.GetAsInt64()
 | 
			
		||||
	if enableQueueProtection && nqInQueueThreshold >= 0 {
 | 
			
		||||
		// >= 0 means enable queue length protection
 | 
			
		||||
		sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
 | 
			
		||||
			return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
 | 
			
		||||
		}
 | 
			
		||||
		for _, metric := range q.queryNodeMetrics {
 | 
			
		||||
			// We think of the NQ of query request as 1.
 | 
			
		||||
			// search use same queue length counter with query
 | 
			
		||||
			if sum(metric.SearchQueue) >= nqInQueueThreshold {
 | 
			
		||||
				limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	metricMap, collectionMetricMap := q.getReadRates()
 | 
			
		||||
	clusterLimit, limitDBNameSet, limitCollectionNameSet := q.getLimitedDBAndCollections(metricMap, collectionMetricMap)
 | 
			
		||||
 | 
			
		||||
	coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
 | 
			
		||||
 | 
			
		||||
	if clusterLimit {
 | 
			
		||||
		realTimeClusterSearchRate := metricMap[internalpb.RateType_DQLSearch.String()]
 | 
			
		||||
		realTimeClusterQueryRate := metricMap[internalpb.RateType_DQLQuery.String()]
 | 
			
		||||
		q.coolOffReading(realTimeClusterSearchRate, realTimeClusterQueryRate, coolOffSpeed, q.rateLimiter.GetRootLimiters(), log)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if updateLimitErr := q.coolOffDatabaseReading(deniedDatabaseIDs, limitDBNameSet, collectionMetricMap,
 | 
			
		||||
		log); updateLimitErr != nil {
 | 
			
		||||
		return updateLimitErr
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if updateLimitErr := q.coolOffCollectionReading(deniedDatabaseIDs, &limitCollectionSet, limitCollectionNameSet,
 | 
			
		||||
		collectionMetricMap, log); updateLimitErr != nil {
 | 
			
		||||
		return updateLimitErr
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *QuotaCenter) coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed float64,
 | 
			
		||||
	node *rlinternal.RateLimiterNode, mlog *log.MLogger,
 | 
			
		||||
) {
 | 
			
		||||
	limiter := node.GetLimiters()
 | 
			
		||||
 | 
			
		||||
	v, ok := limiter.Get(internalpb.RateType_DQLSearch)
 | 
			
		||||
	if ok && v.Limit() != Inf && realTimeSearchRate > 0 {
 | 
			
		||||
		v.SetLimit(Limit(realTimeSearchRate * coolOffSpeed))
 | 
			
		||||
		mlog.RatedWarn(10, "QuotaCenter cool read rates off done",
 | 
			
		||||
			zap.Any("level", node.Level()),
 | 
			
		||||
			zap.Any("id", node.GetID()),
 | 
			
		||||
			zap.Any("searchRate", v.Limit()))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	v, ok = limiter.Get(internalpb.RateType_DQLQuery)
 | 
			
		||||
	if ok && v.Limit() != Inf && realTimeQueryRate > 0 {
 | 
			
		||||
		v.SetLimit(Limit(realTimeQueryRate * coolOffSpeed))
 | 
			
		||||
		mlog.RatedWarn(10, "QuotaCenter cool read rates off done",
 | 
			
		||||
			zap.Any("level", node.Level()),
 | 
			
		||||
			zap.Any("id", node.GetID()),
 | 
			
		||||
			zap.Any("queryRate", v.Limit()))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *QuotaCenter) getDenyWritingDBs() map[int64]struct{} {
 | 
			
		||||
	dbIDs := make(map[int64]struct{})
 | 
			
		||||
	for _, dbID := range lo.Uniq(q.collectionIDToDBID.Values()) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -574,95 +574,13 @@ func TestQuotaCenter(t *testing.T) {
 | 
			
		|||
		quotaCenter.clearMetrics()
 | 
			
		||||
		quotaCenter.collectionIDToDBID = collectionIDToDBID
 | 
			
		||||
		quotaCenter.readableCollections = map[int64]map[int64][]int64{
 | 
			
		||||
			0: {1: {}, 2: {}, 3: {}},
 | 
			
		||||
			1: {4: {}},
 | 
			
		||||
			0: {1: {}},
 | 
			
		||||
			1: {2: {}},
 | 
			
		||||
		}
 | 
			
		||||
		quotaCenter.dbs.Insert("default", 0)
 | 
			
		||||
		quotaCenter.dbs.Insert("db1", 1)
 | 
			
		||||
		quotaCenter.collections.Insert("0.col1", 1)
 | 
			
		||||
		quotaCenter.collections.Insert("0.col2", 2)
 | 
			
		||||
		quotaCenter.collections.Insert("0.col3", 3)
 | 
			
		||||
		quotaCenter.collections.Insert("1.col4", 4)
 | 
			
		||||
 | 
			
		||||
		colSubLabel := ratelimitutil.GetCollectionSubLabel("default", "col1")
 | 
			
		||||
		quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{
 | 
			
		||||
			1: {Rms: []metricsinfo.RateMetric{
 | 
			
		||||
				{Label: internalpb.RateType_DQLSearch.String(), Rate: 100},
 | 
			
		||||
				{Label: internalpb.RateType_DQLQuery.String(), Rate: 100},
 | 
			
		||||
				{Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(), colSubLabel), Rate: 100},
 | 
			
		||||
				{Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(), colSubLabel), Rate: 100},
 | 
			
		||||
			}},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.ForceDenyReading.Key, "false")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.QueueProtectionEnabled.Key, "true")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.QueueLatencyThreshold.Key, "100")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.DQLLimitEnabled.Key, "true")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.DQLMaxQueryRatePerCollection.Key, "500")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key, "500")
 | 
			
		||||
 | 
			
		||||
		checkLimiter := func() {
 | 
			
		||||
			for db, collections := range quotaCenter.readableCollections {
 | 
			
		||||
				for collection := range collections {
 | 
			
		||||
					if collection != 1 {
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					limiters := quotaCenter.rateLimiter.GetCollectionLimiters(db, collection).GetLimiters()
 | 
			
		||||
					searchLimit, _ := limiters.Get(internalpb.RateType_DQLSearch)
 | 
			
		||||
					assert.Equal(t, Limit(100.0*0.9), searchLimit.Limit())
 | 
			
		||||
 | 
			
		||||
					queryLimit, _ := limiters.Get(internalpb.RateType_DQLQuery)
 | 
			
		||||
					assert.Equal(t, Limit(100.0*0.9), queryLimit.Limit())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		err := quotaCenter.resetAllCurrentRates()
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
		quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{
 | 
			
		||||
			1: {SearchQueue: metricsinfo.ReadInfoInQueue{
 | 
			
		||||
				AvgQueueDuration: Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second),
 | 
			
		||||
			}, Effect: metricsinfo.NodeEffect{
 | 
			
		||||
				NodeID:        1,
 | 
			
		||||
				CollectionIDs: []int64{1, 2, 3},
 | 
			
		||||
			}},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		err = quotaCenter.calculateReadRates()
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
		checkLimiter()
 | 
			
		||||
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.NQInQueueThreshold.Key, "100")
 | 
			
		||||
		quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{
 | 
			
		||||
			1: {
 | 
			
		||||
				SearchQueue: metricsinfo.ReadInfoInQueue{
 | 
			
		||||
					UnsolvedQueue: Params.QuotaConfig.NQInQueueThreshold.GetAsInt64(),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		err = quotaCenter.calculateReadRates()
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
		checkLimiter()
 | 
			
		||||
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.ResultProtectionEnabled.Key, "true")
 | 
			
		||||
		paramtable.Get().Save(Params.QuotaConfig.MaxReadResultRate.Key, "1")
 | 
			
		||||
		quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{
 | 
			
		||||
			1: {
 | 
			
		||||
				Rms: []metricsinfo.RateMetric{
 | 
			
		||||
					{Label: internalpb.RateType_DQLSearch.String(), Rate: 100},
 | 
			
		||||
					{Label: internalpb.RateType_DQLQuery.String(), Rate: 100},
 | 
			
		||||
					{Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(), colSubLabel), Rate: 100},
 | 
			
		||||
					{Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(), colSubLabel), Rate: 100},
 | 
			
		||||
					{Label: metricsinfo.ReadResultThroughput, Rate: 1.2},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {SearchQueue: metricsinfo.ReadInfoInQueue{}}}
 | 
			
		||||
		err = quotaCenter.calculateReadRates()
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
		checkLimiter()
 | 
			
		||||
 | 
			
		||||
		meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Unset()
 | 
			
		||||
		meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).
 | 
			
		||||
			RunAndReturn(func(ctx context.Context, i int64, u uint64) (*model.Database, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -1582,174 +1500,6 @@ func TestGetRateType(t *testing.T) {
 | 
			
		|||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCalculateReadRates(t *testing.T) {
 | 
			
		||||
	paramtable.Init()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	t.Run("cool off db", func(t *testing.T) {
 | 
			
		||||
		qc := mocks.NewMockQueryCoordClient(t)
 | 
			
		||||
		meta := mockrootcoord.NewIMetaTable(t)
 | 
			
		||||
		meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrDatabaseNotFound).Maybe()
 | 
			
		||||
 | 
			
		||||
		pcm := proxyutil.NewMockProxyClientManager(t)
 | 
			
		||||
		dc := mocks.NewMockDataCoordClient(t)
 | 
			
		||||
		core, _ := NewCore(ctx, nil)
 | 
			
		||||
		core.tsoAllocator = newMockTsoAllocator()
 | 
			
		||||
 | 
			
		||||
		meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, errors.New("mock error"))
 | 
			
		||||
 | 
			
		||||
		Params.Save(Params.QuotaConfig.ForceDenyReading.Key, "false")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.ForceDenyReading.Key)
 | 
			
		||||
 | 
			
		||||
		Params.Save(Params.QuotaConfig.ResultProtectionEnabled.Key, "true")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.ResultProtectionEnabled.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.MaxReadResultRate.Key, "50")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.MaxReadResultRate.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.MaxReadResultRatePerDB.Key, "30")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.MaxReadResultRatePerDB.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.MaxReadResultRatePerCollection.Key, "20")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.MaxReadResultRatePerCollection.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.CoolOffSpeed.Key, "0.8")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.CoolOffSpeed.Key)
 | 
			
		||||
 | 
			
		||||
		Params.Save(Params.QuotaConfig.DQLLimitEnabled.Key, "true")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.DQLLimitEnabled.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.DQLMaxSearchRate.Key, "500")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRate.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.DQLMaxSearchRatePerDB.Key, "500")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRatePerDB.Key)
 | 
			
		||||
		Params.Save(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key, "500")
 | 
			
		||||
		defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key)
 | 
			
		||||
 | 
			
		||||
		quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
 | 
			
		||||
		quotaCenter.dbs = typeutil.NewConcurrentMap[string, int64]()
 | 
			
		||||
		quotaCenter.collections = typeutil.NewConcurrentMap[string, int64]()
 | 
			
		||||
		quotaCenter.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]()
 | 
			
		||||
		quotaCenter.dbs.Insert("default", 1)
 | 
			
		||||
		quotaCenter.dbs.Insert("test", 2)
 | 
			
		||||
		quotaCenter.collections.Insert("1.col1", 10)
 | 
			
		||||
		quotaCenter.collections.Insert("2.col2", 20)
 | 
			
		||||
		quotaCenter.collections.Insert("2.col3", 30)
 | 
			
		||||
		quotaCenter.collectionIDToDBID.Insert(10, 1)
 | 
			
		||||
		quotaCenter.collectionIDToDBID.Insert(20, 2)
 | 
			
		||||
		quotaCenter.collectionIDToDBID.Insert(30, 2)
 | 
			
		||||
 | 
			
		||||
		searchLabel := internalpb.RateType_DQLSearch.String()
 | 
			
		||||
		quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{}
 | 
			
		||||
		quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{
 | 
			
		||||
			1: {
 | 
			
		||||
				Rms: []metricsinfo.RateMetric{
 | 
			
		||||
					{
 | 
			
		||||
						Label: metricsinfo.ReadResultThroughput,
 | 
			
		||||
						Rate:  40 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					//{
 | 
			
		||||
					//	Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
 | 
			
		||||
					//	Rate:  20 * 1024 * 1024,
 | 
			
		||||
					//},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")),
 | 
			
		||||
						Rate:  15 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					//{
 | 
			
		||||
					//	Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("test")),
 | 
			
		||||
					//	Rate:  20 * 1024 * 1024,
 | 
			
		||||
					//},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("test", "col2")),
 | 
			
		||||
						Rate:  10 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("test", "col3")),
 | 
			
		||||
						Rate:  10 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: searchLabel,
 | 
			
		||||
						Rate:  20,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")),
 | 
			
		||||
						Rate:  10,
 | 
			
		||||
					},
 | 
			
		||||
					//{
 | 
			
		||||
					//	Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("test")),
 | 
			
		||||
					//	Rate:  10,
 | 
			
		||||
					//},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")),
 | 
			
		||||
						Rate:  10,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("test", "col2")),
 | 
			
		||||
						Rate:  5,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("test", "col3")),
 | 
			
		||||
						Rate:  5,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			2: {
 | 
			
		||||
				Rms: []metricsinfo.RateMetric{
 | 
			
		||||
					{
 | 
			
		||||
						Label: metricsinfo.ReadResultThroughput,
 | 
			
		||||
						Rate:  20 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					//{
 | 
			
		||||
					//	Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
 | 
			
		||||
					//	Rate:  20 * 1024 * 1024,
 | 
			
		||||
					//},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")),
 | 
			
		||||
						Rate:  20 * 1024 * 1024,
 | 
			
		||||
					},
 | 
			
		||||
					{
 | 
			
		||||
						Label: searchLabel,
 | 
			
		||||
						Rate:  20,
 | 
			
		||||
					},
 | 
			
		||||
					//{
 | 
			
		||||
					//	Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")),
 | 
			
		||||
					//	Rate:  20,
 | 
			
		||||
					//},
 | 
			
		||||
					{
 | 
			
		||||
						Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")),
 | 
			
		||||
						Rate:  20,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		quotaCenter.rateLimiter.GetRootLimiters().GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(1000, 1000))
 | 
			
		||||
		quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(1, 10,
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Database, allOps),
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Collection, allOps))
 | 
			
		||||
		quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(2, 20,
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Database, allOps),
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Collection, allOps))
 | 
			
		||||
		quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(2, 30,
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Database, allOps),
 | 
			
		||||
			newParamLimiterFunc(internalpb.RateScope_Collection, allOps))
 | 
			
		||||
 | 
			
		||||
		err := quotaCenter.calculateReadRates()
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
		checkRate := func(rateNode *interalratelimitutil.RateLimiterNode, expectValue float64) {
 | 
			
		||||
			searchRate, ok := rateNode.GetLimiters().Get(internalpb.RateType_DQLSearch)
 | 
			
		||||
			assert.True(t, ok)
 | 
			
		||||
			assert.EqualValues(t, expectValue, searchRate.Limit())
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetRootLimiters(), float64(32))             // (20 + 20) * 0.8
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(1), float64(24))        // (20 + 10) * 0.8
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(2), float64(500))       // not cool off
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(1, 10), float64(24))  // (20 + 10) * 0.8
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 20), float64(500)) // not cool off
 | 
			
		||||
			checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 30), float64(500)) // not cool off
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestResetAllCurrentRates(t *testing.T) {
 | 
			
		||||
	paramtable.Init()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,8 +17,6 @@
 | 
			
		|||
package metricsinfo
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/typeutil"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -26,18 +24,11 @@ import (
 | 
			
		|||
type RateMetricLabel = string
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	NQPerSecond             RateMetricLabel = "NQPerSecond"
 | 
			
		||||
	SearchThroughput        RateMetricLabel = "SearchThroughput"
 | 
			
		||||
	ReadResultThroughput    RateMetricLabel = "ReadResultThroughput"
 | 
			
		||||
	InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput"
 | 
			
		||||
	DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	SearchQueueMetric string = "SearchQueue"
 | 
			
		||||
	QueryQueueMetric  string = "QueryQueue"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	UnsolvedQueueType string = "Unsolved"
 | 
			
		||||
	ReadyQueueType    string = "Ready"
 | 
			
		||||
| 
						 | 
				
			
			@ -58,15 +49,6 @@ type FlowGraphMetric struct {
 | 
			
		|||
	NumFlowGraph        int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadInfoInQueue contains NQ num or task num in QueryNode's task queue.
 | 
			
		||||
type ReadInfoInQueue struct {
 | 
			
		||||
	UnsolvedQueue    int64
 | 
			
		||||
	ReadyQueue       int64
 | 
			
		||||
	ReceiveChan      int64
 | 
			
		||||
	ExecuteChan      int64
 | 
			
		||||
	AvgQueueDuration time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NodeEffect contains the a node and its effected collection info.
 | 
			
		||||
type NodeEffect struct {
 | 
			
		||||
	NodeID        int64
 | 
			
		||||
| 
						 | 
				
			
			@ -78,8 +60,6 @@ type QueryNodeQuotaMetrics struct {
 | 
			
		|||
	Hms                 HardwareMetrics
 | 
			
		||||
	Rms                 []RateMetric
 | 
			
		||||
	Fgm                 FlowGraphMetric
 | 
			
		||||
	SearchQueue         ReadInfoInQueue
 | 
			
		||||
	QueryQueue          ReadInfoInQueue
 | 
			
		||||
	GrowingSegmentsSize int64
 | 
			
		||||
	Effect              NodeEffect
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,7 +19,6 @@ package paramtable
 | 
			
		|||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math"
 | 
			
		||||
	"strconv"
 | 
			
		||||
 | 
			
		||||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -157,15 +156,7 @@ type quotaConfig struct {
 | 
			
		|||
	L0SegmentRowCountHighWaterLevel      ParamItem `refreshable:"true"`
 | 
			
		||||
 | 
			
		||||
	// limit reading
 | 
			
		||||
	ForceDenyReading               ParamItem `refreshable:"true"`
 | 
			
		||||
	QueueProtectionEnabled         ParamItem `refreshable:"true"`
 | 
			
		||||
	NQInQueueThreshold             ParamItem `refreshable:"true"`
 | 
			
		||||
	QueueLatencyThreshold          ParamItem `refreshable:"true"`
 | 
			
		||||
	ResultProtectionEnabled        ParamItem `refreshable:"true"`
 | 
			
		||||
	MaxReadResultRate              ParamItem `refreshable:"true"`
 | 
			
		||||
	MaxReadResultRatePerDB         ParamItem `refreshable:"true"`
 | 
			
		||||
	MaxReadResultRatePerCollection ParamItem `refreshable:"true"`
 | 
			
		||||
	CoolOffSpeed                   ParamItem `refreshable:"true"`
 | 
			
		||||
	ForceDenyReading ParamItem `refreshable:"true"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *quotaConfig) init(base *BaseTable) {
 | 
			
		||||
| 
						 | 
				
			
			@ -1926,159 +1917,6 @@ specific conditions, such as collection has been dropped), ` + "true" + ` means
 | 
			
		|||
	}
 | 
			
		||||
	p.ForceDenyReading.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.QueueProtectionEnabled = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.queueProtection.enabled",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: "false",
 | 
			
		||||
		Export:       true,
 | 
			
		||||
	}
 | 
			
		||||
	p.QueueProtectionEnabled.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.NQInQueueThreshold = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: strconv.FormatInt(math.MaxInt64, 10),
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			if !p.QueueProtectionEnabled.GetAsBool() {
 | 
			
		||||
				return strconv.FormatInt(math.MaxInt64, 10)
 | 
			
		||||
			}
 | 
			
		||||
			threshold := getAsFloat(v)
 | 
			
		||||
			// [0, inf)
 | 
			
		||||
			if threshold < 0 {
 | 
			
		||||
				return strconv.FormatInt(math.MaxInt64, 10)
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Doc: `nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
 | 
			
		||||
until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
 | 
			
		||||
int, default no limit`,
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.NQInQueueThreshold.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.QueueLatencyThreshold = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: max,
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			if !p.QueueProtectionEnabled.GetAsBool() {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			level := getAsFloat(v)
 | 
			
		||||
			// [0, inf)
 | 
			
		||||
			if level < 0 {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Doc: `queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
 | 
			
		||||
until the latency of queuing no longer exceeds queueLatencyThreshold.
 | 
			
		||||
The latency here refers to the averaged latency over a period of time.
 | 
			
		||||
milliseconds, default no limit`,
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.QueueLatencyThreshold.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.ResultProtectionEnabled = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.resultProtection.enabled",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: "false",
 | 
			
		||||
		Export:       true,
 | 
			
		||||
	}
 | 
			
		||||
	p.ResultProtectionEnabled.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.MaxReadResultRate = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.resultProtection.maxReadResultRate",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: max,
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			if !p.ResultProtectionEnabled.GetAsBool() {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			rate := getAsFloat(v)
 | 
			
		||||
			if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
 | 
			
		||||
				return fmt.Sprintf("%f", megaBytes2Bytes(rate))
 | 
			
		||||
			}
 | 
			
		||||
			// [0, inf)
 | 
			
		||||
			if rate < 0 {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Doc: `maxReadResultRate indicated that the system was under backpressure for Search/Query path.
 | 
			
		||||
If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
 | 
			
		||||
until the read result rate no longer exceeds maxReadResultRate.
 | 
			
		||||
MB/s, default no limit`,
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.MaxReadResultRate.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.MaxReadResultRatePerDB = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.resultProtection.maxReadResultRatePerDB",
 | 
			
		||||
		Version:      "2.4.1",
 | 
			
		||||
		DefaultValue: max,
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			if !p.ResultProtectionEnabled.GetAsBool() {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			rate := getAsFloat(v)
 | 
			
		||||
			if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
 | 
			
		||||
				return fmt.Sprintf("%f", megaBytes2Bytes(rate))
 | 
			
		||||
			}
 | 
			
		||||
			// [0, inf)
 | 
			
		||||
			if rate < 0 {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.MaxReadResultRatePerDB.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.MaxReadResultRatePerCollection = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.resultProtection.maxReadResultRatePerCollection",
 | 
			
		||||
		Version:      "2.4.1",
 | 
			
		||||
		DefaultValue: max,
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			if !p.ResultProtectionEnabled.GetAsBool() {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			rate := getAsFloat(v)
 | 
			
		||||
			if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
 | 
			
		||||
				return fmt.Sprintf("%f", megaBytes2Bytes(rate))
 | 
			
		||||
			}
 | 
			
		||||
			// [0, inf)
 | 
			
		||||
			if rate < 0 {
 | 
			
		||||
				return max
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.MaxReadResultRatePerCollection.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	const defaultSpeed = "0.9"
 | 
			
		||||
	p.CoolOffSpeed = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limitReading.coolOffSpeed",
 | 
			
		||||
		Version:      "2.2.0",
 | 
			
		||||
		DefaultValue: defaultSpeed,
 | 
			
		||||
		Formatter: func(v string) string {
 | 
			
		||||
			// (0, 1]
 | 
			
		||||
			speed := getAsFloat(v)
 | 
			
		||||
			if speed <= 0 || speed > 1 {
 | 
			
		||||
				// log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed))
 | 
			
		||||
				return defaultSpeed
 | 
			
		||||
			}
 | 
			
		||||
			return v
 | 
			
		||||
		},
 | 
			
		||||
		Doc: `colOffSpeed is the speed of search&query rates cool off.
 | 
			
		||||
(0, 1]`,
 | 
			
		||||
		Export: true,
 | 
			
		||||
	}
 | 
			
		||||
	p.CoolOffSpeed.Init(base.mgr)
 | 
			
		||||
 | 
			
		||||
	p.AllocRetryTimes = ParamItem{
 | 
			
		||||
		Key:          "quotaAndLimits.limits.allocRetryTimes",
 | 
			
		||||
		Version:      "2.4.0",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,7 +17,6 @@
 | 
			
		|||
package paramtable
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
| 
						 | 
				
			
			@ -206,12 +205,6 @@ func TestQuotaParam(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
	t.Run("test limit reading", func(t *testing.T) {
 | 
			
		||||
		assert.False(t, qc.ForceDenyReading.GetAsBool())
 | 
			
		||||
		assert.Equal(t, false, qc.QueueProtectionEnabled.GetAsBool())
 | 
			
		||||
		assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold.GetAsInt64())
 | 
			
		||||
		assert.Equal(t, defaultMax, qc.QueueLatencyThreshold.GetAsFloat())
 | 
			
		||||
		assert.Equal(t, false, qc.ResultProtectionEnabled.GetAsBool())
 | 
			
		||||
		assert.Equal(t, defaultMax, qc.MaxReadResultRate.GetAsFloat())
 | 
			
		||||
		assert.Equal(t, 0.9, qc.CoolOffSpeed.GetAsFloat())
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("test disk quota", func(t *testing.T) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue