From f98dbcf5bef020fa50f66a0e6990233cddf6a976 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Sun, 9 Oct 2022 14:30:58 +0800 Subject: [PATCH] Update quota params and disable limit for showCollection (#19641) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- configs/milvus.yaml | 38 ++++----- internal/proxy/multi_rate_limiter.go | 6 +- internal/proxy/rate_limit_interceptor.go | 24 ++---- internal/proxy/rate_limit_interceptor_test.go | 4 - internal/util/paramtable/quota_param.go | 80 +++++++++++-------- internal/util/paramtable/quota_param_test.go | 12 ++- 6 files changed, 82 insertions(+), 82 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fc9f3cb337..31efb95566 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -377,43 +377,43 @@ quotaAndLimits: enabled: false # `true` to enable quota and limits, `false` to disable. # quotaCenterCollectInterval is the time interval that quotaCenter - # collects metrics from Query cluster and Data cluster. + # collects metrics from Proxies, Query cluster and Data cluster. quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536) ddl: # ddl limit rates, default no limit. enabled: false - collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection - partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition - indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex - flushRate: # requests per minute, default no limit, rate for flush - compactionRate: # requests per minute, default no limit, rate for manualCompaction + collectionRate: # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection + partitionRate: # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition + + indexRate: + enabled: false + max: # qps, default no limit, rate for CreateIndex, DropIndex + flushRate: + enabled: false + max: # qps, default no limit, rate for flush + compactionRate: + enabled: false + max: # qps, default no limit, rate for manualCompaction # dml limit rates, default no limit. - # The maximum rate will not be greater than `max`, - # and the rate after handling back pressure will not be less than `min`. + # The maximum rate will not be greater than `max`. dml: enabled: false insertRate: max: # MB/s, default no limit - min: # MB/s, default 0 deleteRate: max: # MB/s, default no limit - min: # MB/s, default 0 bulkLoadRate: # not support yet. TODO: limit bulkLoad rate max: # MB/s, default no limit - min: # MB/s, default 0 # dql limit rates, default no limit. - # The maximum rate will not be greater than `max`, - # and the rate after handling back pressure will not be less than `min`. + # The maximum rate will not be greater than `max`. dql: enabled: false searchRate: max: # vps (vectors per second), default no limit - min: # vps (vectors per second), default 0 queryRate: max: # qps, default no limit - min: # qps, default 0 # limitWriting decides whether dml requests are allowed. limitWriting: @@ -432,10 +432,10 @@ quotaAndLimits: # When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate; # When memory usage < memoryLowWaterLevel, no action. # memoryLowWaterLevel should be less than memoryHighWaterLevel. - dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes - dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes - queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes - queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes + dataNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in DataNodes + dataNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in DataNodes + queryNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in QueryNodes + queryNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in QueryNodes # limitReading decides whether dql requests are allowed. limitReading: diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index ee0e39090b..c811503527 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -106,11 +106,11 @@ func (rl *rateLimiter) registerLimiters() { case internalpb.RateType_DDLPartition: r = Params.QuotaConfig.DDLPartitionRate case internalpb.RateType_DDLIndex: - r = Params.QuotaConfig.DDLIndexRate + r = Params.QuotaConfig.MaxIndexRate case internalpb.RateType_DDLFlush: - r = Params.QuotaConfig.DDLFlushRate + r = Params.QuotaConfig.MaxFlushRate case internalpb.RateType_DDLCompaction: - r = Params.QuotaConfig.DDLCompactionRate + r = Params.QuotaConfig.MaxCompactionRate case internalpb.RateType_DMLInsert: r = Params.QuotaConfig.DMLMaxInsertRate case internalpb.RateType_DMLDelete: diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index 42d72f7cc5..736b054187 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -65,15 +65,15 @@ func getRequestInfo(req interface{}) (internalpb.RateType, int, error) { return internalpb.RateType_DQLSearch, int(r.GetNq()), nil case *milvuspb.QueryRequest: return internalpb.RateType_DQLQuery, 1, nil // think of the query request's nq as 1 - case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest, *milvuspb.HasCollectionRequest: + case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest: return internalpb.RateType_DDLCollection, 1, nil - case *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest, *milvuspb.ShowCollectionsRequest: + case *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest: return internalpb.RateType_DDLCollection, 1, nil - case *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest, *milvuspb.HasPartitionRequest: + case *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest: return internalpb.RateType_DDLPartition, 1, nil - case *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest, *milvuspb.ShowPartitionsRequest: + case *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest: return internalpb.RateType_DDLPartition, 1, nil - case *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest, *milvuspb.DescribeIndexRequest: + case *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest: return internalpb.RateType_DDLIndex, 1, nil case *milvuspb.FlushRequest: return internalpb.RateType_DDLFlush, 1, nil @@ -130,20 +130,6 @@ func getFailedResponse(req interface{}, code commonpb.ErrorCode, reason string) *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest, *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest: return failedStatus(code, reason), nil - case *milvuspb.HasCollectionRequest, *milvuspb.HasPartitionRequest: - return failedBoolResponse(code, reason), nil - case *milvuspb.ShowCollectionsRequest: - return &milvuspb.ShowCollectionsResponse{ - Status: failedStatus(code, reason), - }, nil - case *milvuspb.ShowPartitionsRequest: - return &milvuspb.ShowPartitionsResponse{ - Status: failedStatus(code, reason), - }, nil - case *milvuspb.DescribeIndexRequest: - return &milvuspb.DescribeIndexResponse{ - Status: failedStatus(code, reason), - }, nil case *milvuspb.FlushRequest: return &milvuspb.FlushResponse{ Status: failedStatus(code, reason), diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index f2d44afccf..c1090ce002 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -96,10 +96,6 @@ func TestRateLimitInterceptor(t *testing.T) { testGetFailedResponse(&milvuspb.SearchRequest{}) testGetFailedResponse(&milvuspb.QueryRequest{}) testGetFailedResponse(&milvuspb.CreateCollectionRequest{}) - testGetFailedResponse(&milvuspb.HasCollectionRequest{}) - testGetFailedResponse(&milvuspb.ShowCollectionsRequest{}) - testGetFailedResponse(&milvuspb.ShowPartitionsRequest{}) - testGetFailedResponse(&milvuspb.DescribeIndexRequest{}) testGetFailedResponse(&milvuspb.FlushRequest{}) testGetFailedResponse(&milvuspb.ManualCompactionRequest{}) diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index 948513864e..351eae836a 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -32,12 +32,9 @@ const ( // defaultMax is the default minimal rate. defaultMin = float64(0) // defaultLowWaterLevel is the default memory low water level. - defaultLowWaterLevel = float64(0.8) + defaultLowWaterLevel = float64(0.85) // defaultHighWaterLevel is the default memory low water level. - defaultHighWaterLevel = float64(0.9) - - // secondsPerMinute is used to convert minutes to seconds. - secondsPerMinute = 60.0 + defaultHighWaterLevel = float64(0.95) ) // quotaConfig is configuration for quota and limitations. @@ -52,9 +49,13 @@ type quotaConfig struct { DDLLimitEnabled bool DDLCollectionRate float64 DDLPartitionRate float64 - DDLIndexRate float64 - DDLFlushRate float64 - DDLCompactionRate float64 + + IndexLimitEnabled bool + MaxIndexRate float64 + FlushLimitEnabled bool + MaxFlushRate float64 + CompactionLimitEnabled bool + MaxCompactionRate float64 // dml DMLLimitEnabled bool @@ -103,9 +104,13 @@ func (p *quotaConfig) init(base *BaseTable) { p.initDDLLimitEnabled() p.initDDLCollectionRate() p.initDDLPartitionRate() - p.initDDLIndexRate() - p.initDDLFlushRate() - p.initDDLCompactionRate() + + p.initIndexLimitEnabled() + p.initMaxIndexRate() + p.initFlushLimitEnabled() + p.initMaxFlushRate() + p.initCompactionLimitEnabled() + p.initMaxCompactionRate() // dml p.initDMLLimitEnabled() @@ -167,7 +172,6 @@ func (p *quotaConfig) initDDLCollectionRate() { return } p.DDLCollectionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.collectionRate", defaultMax) - p.DDLCollectionRate /= secondsPerMinute // [0 ~ Inf) if p.DDLCollectionRate < 0 { p.DDLCollectionRate = defaultMax @@ -180,49 +184,57 @@ func (p *quotaConfig) initDDLPartitionRate() { return } p.DDLPartitionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.partitionRate", defaultMax) - p.DDLPartitionRate /= secondsPerMinute // [0 ~ Inf) if p.DDLPartitionRate < 0 { p.DDLPartitionRate = defaultMax } } -func (p *quotaConfig) initDDLIndexRate() { - if !p.DDLLimitEnabled { - p.DDLIndexRate = defaultMax +func (p *quotaConfig) initIndexLimitEnabled() { + p.IndexLimitEnabled = p.Base.ParseBool("quotaAndLimits.indexRate.enabled", false) +} + +func (p *quotaConfig) initMaxIndexRate() { + if !p.IndexLimitEnabled { + p.MaxIndexRate = defaultMax return } - p.DDLIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.indexRate", defaultMax) - p.DDLIndexRate /= secondsPerMinute + p.MaxIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.indexRate.max", defaultMax) // [0 ~ Inf) - if p.DDLIndexRate < 0 { - p.DDLIndexRate = defaultMax + if p.MaxIndexRate < 0 { + p.MaxIndexRate = defaultMax } } -func (p *quotaConfig) initDDLFlushRate() { - if !p.DDLLimitEnabled { - p.DDLFlushRate = defaultMax +func (p *quotaConfig) initFlushLimitEnabled() { + p.FlushLimitEnabled = p.Base.ParseBool("quotaAndLimits.flushRate.enabled", false) +} + +func (p *quotaConfig) initMaxFlushRate() { + if !p.FlushLimitEnabled { + p.MaxFlushRate = defaultMax return } - p.DDLFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.flushRate", defaultMax) - p.DDLFlushRate /= secondsPerMinute + p.MaxFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.flushRate.max", defaultMax) // [0 ~ Inf) - if p.DDLFlushRate < 0 { - p.DDLFlushRate = defaultMax + if p.MaxFlushRate < 0 { + p.MaxFlushRate = defaultMax } } -func (p *quotaConfig) initDDLCompactionRate() { - if !p.DDLLimitEnabled { - p.DDLCompactionRate = defaultMax +func (p *quotaConfig) initCompactionLimitEnabled() { + p.CompactionLimitEnabled = p.Base.ParseBool("quotaAndLimits.compactionRate.enabled", false) +} + +func (p *quotaConfig) initMaxCompactionRate() { + if !p.CompactionLimitEnabled { + p.MaxCompactionRate = defaultMax return } - p.DDLCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.compactionRate", defaultMax) - p.DDLCompactionRate /= secondsPerMinute + p.MaxCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.compactionRate.max", defaultMax) // [0 ~ Inf) - if p.DDLCompactionRate < 0 { - p.DDLCompactionRate = defaultMax + if p.MaxCompactionRate < 0 { + p.MaxCompactionRate = defaultMax } } diff --git a/internal/util/paramtable/quota_param_test.go b/internal/util/paramtable/quota_param_test.go index 3b10557eb6..737f7b3454 100644 --- a/internal/util/paramtable/quota_param_test.go +++ b/internal/util/paramtable/quota_param_test.go @@ -36,9 +36,15 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, false, qc.DDLLimitEnabled) assert.Equal(t, defaultMax, qc.DDLCollectionRate) assert.Equal(t, defaultMax, qc.DDLPartitionRate) - assert.Equal(t, defaultMax, qc.DDLIndexRate) - assert.Equal(t, defaultMax, qc.DDLFlushRate) - assert.Equal(t, defaultMax, qc.DDLCompactionRate) + }) + + t.Run("test functional params", func(t *testing.T) { + assert.Equal(t, false, qc.IndexLimitEnabled) + assert.Equal(t, defaultMax, qc.MaxIndexRate) + assert.Equal(t, false, qc.FlushLimitEnabled) + assert.Equal(t, defaultMax, qc.MaxFlushRate) + assert.Equal(t, false, qc.CompactionLimitEnabled) + assert.Equal(t, defaultMax, qc.MaxCompactionRate) }) t.Run("test dml", func(t *testing.T) {