enhance: enable flush rate limiter of collection level (#33864)

pr: #33837

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/33878/head
jaime 2024-06-14 16:27:57 +08:00 committed by GitHub
parent 5b847e93fc
commit fd1c7b1a1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 25 additions and 13 deletions

View File

@ -647,10 +647,10 @@ quotaAndLimits:
db:
max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex
flushRate:
enabled: false
enabled: true
max: -1 # qps, default no limit, rate for flush
collection:
max: -1 # qps, default no limit, rate for flush at collection level.
max: 0.1 # qps, default no limit, rate for flush at collection level.
db:
max: -1 # qps of db level, default no limit, rate for flush
compactionRate:

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
@ -116,6 +117,16 @@ func TestSimpleRateLimiter(t *testing.T) {
}
for _, rt := range internalpb.RateType_value {
if internalpb.RateType_DDLFlush == internalpb.RateType(rt) {
// the flush request has 0.1 rate limiter that means only allow to execute one request each 10 seconds.
time.Sleep(10 * time.Second)
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
continue
}
if IsDDLRequest(internalpb.RateType(rt)) {
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
@ -123,14 +134,15 @@ func TestSimpleRateLimiter(t *testing.T) {
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
} else {
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
continue
}
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
}
Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak)
})

View File

@ -327,7 +327,7 @@ seconds, (0 ~ 65536)`,
p.FlushLimitEnabled = ParamItem{
Key: "quotaAndLimits.flushRate.enabled",
Version: "2.2.0",
DefaultValue: "false",
DefaultValue: "true",
Export: true,
}
p.FlushLimitEnabled.Init(base.mgr)
@ -373,7 +373,7 @@ seconds, (0 ~ 65536)`,
p.MaxFlushRatePerCollection = ParamItem{
Key: "quotaAndLimits.flushRate.collection.max",
Version: "2.3.9",
DefaultValue: "-1",
DefaultValue: "0.1",
Formatter: func(v string) string {
if !p.FlushLimitEnabled.GetAsBool() {
return max

View File

@ -41,8 +41,8 @@ func TestQuotaParam(t *testing.T) {
t.Run("test functional params", func(t *testing.T) {
assert.Equal(t, false, qc.IndexLimitEnabled.GetAsBool())
assert.Equal(t, defaultMax, qc.MaxIndexRate.GetAsFloat())
assert.False(t, qc.FlushLimitEnabled.GetAsBool())
assert.Equal(t, defaultMax, qc.MaxFlushRatePerCollection.GetAsFloat())
assert.True(t, qc.FlushLimitEnabled.GetAsBool())
assert.Equal(t, 0.1, qc.MaxFlushRatePerCollection.GetAsFloat())
assert.Equal(t, defaultMax, qc.MaxFlushRate.GetAsFloat())
assert.Equal(t, false, qc.CompactionLimitEnabled.GetAsBool())
assert.Equal(t, defaultMax, qc.MaxCompactionRate.GetAsFloat())