From 53ae40b8c6554a16d8c9feaea7f06d845d7821a3 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Wed, 1 Feb 2023 16:03:51 +0800 Subject: [PATCH] Make ratelimiter's config refreshable (#21757) Signed-off-by: Enwei Jiao --- internal/config/source.go | 30 +++++++-- internal/proxy/multi_rate_limiter.go | 67 +++++++++++++++------ internal/proxy/multi_rate_limiter_test.go | 38 ++++++++++-- internal/util/paramtable/base_table.go | 6 +- internal/util/paramtable/component_param.go | 4 ++ internal/util/paramtable/quota_param.go | 30 ++++----- internal/util/typeutil/map.go | 10 +++ 7 files changed, 137 insertions(+), 48 deletions(-) diff --git a/internal/config/source.go b/internal/config/source.go index ead2319b06..a67fcbd128 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -32,12 +32,6 @@ type Source interface { Close() } -// EventHandler handles config change event -type EventHandler interface { - OnEvent(event *Event) - GetIdentifier() string -} - // EtcdInfo has attribute for config center source initialization type EtcdInfo struct { UseEmbed bool @@ -90,3 +84,27 @@ func WithEnvSource(keyFormatter func(string) string) Option { options.EnvKeyFormatter = keyFormatter } } + +// EventHandler handles config change event +type EventHandler interface { + OnEvent(event *Event) + GetIdentifier() string +} + +type simpleHandler struct { + identity string + onEvent func(*Event) +} + +func (s *simpleHandler) GetIdentifier() string { + return s.identity +} + +// OnEvent implements EventHandler +func (s *simpleHandler) OnEvent(event *Event) { + s.onEvent(event) +} + +func NewHandler(ident string, onEvent func(*Event)) EventHandler { + return &simpleHandler{ident, onEvent} +} diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 6c2ef9cb1a..0ccb0cb5ed 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -18,6 +18,7 @@ package proxy import ( "fmt" + "strconv" "sync" "time" @@ -25,11 +26,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus/internal/config" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/ratelimitutil" + "github.com/milvus-io/milvus/internal/util/typeutil" ) var QuotaErrorString = map[commonpb.ErrorCode]string{ @@ -113,13 +116,13 @@ func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, codes [] // rateLimiter implements Limiter. type rateLimiter struct { - limiters map[internalpb.RateType]*ratelimitutil.Limiter + limiters *typeutil.ConcurrentMap[internalpb.RateType, *ratelimitutil.Limiter] } // newRateLimiter returns a new RateLimiter. func newRateLimiter() *rateLimiter { rl := &rateLimiter{ - limiters: make(map[internalpb.RateType]*ratelimitutil.Limiter), + limiters: typeutil.NewConcurrentMap[internalpb.RateType, *ratelimitutil.Limiter](), } rl.registerLimiters() return rl @@ -128,14 +131,18 @@ func newRateLimiter() *rateLimiter { // limit returns true, the request will be rejected. // Otherwise, the request will pass. func (rl *rateLimiter) limit(rt internalpb.RateType, n int) (bool, float64) { - return !rl.limiters[rt].AllowN(time.Now(), n), float64(rl.limiters[rt].Limit()) + limit, ok := rl.limiters.Get(rt) + if !ok { + return false, -1 + } + return !limit.AllowN(time.Now(), n), float64(limit.Limit()) } // setRates sets new rates for the limiters. func (rl *rateLimiter) setRates(rates []*internalpb.Rate) error { for _, r := range rates { - if _, ok := rl.limiters[r.GetRt()]; ok { - rl.limiters[r.GetRt()].SetLimit(ratelimitutil.Limit(r.GetR())) + if limit, ok := rl.limiters.Get(r.GetRt()); ok { + limit.SetLimit(ratelimitutil.Limit(r.GetR())) metrics.SetRateGaugeByRateType(r.GetRt(), paramtable.GetNodeID(), r.GetR()) } else { return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String()) @@ -157,36 +164,56 @@ func (rl *rateLimiter) printRates(rates []*internalpb.Rate) { // registerLimiters register limiter for all rate types. func (rl *rateLimiter) registerLimiters() { + quotaConfig := &Params.QuotaConfig for rt := range internalpb.RateType_name { - var r float64 + var r *paramtable.ParamItem switch internalpb.RateType(rt) { case internalpb.RateType_DDLCollection: - r = Params.QuotaConfig.DDLCollectionRate.GetAsFloat() + r = "aConfig.DDLCollectionRate case internalpb.RateType_DDLPartition: - r = Params.QuotaConfig.DDLPartitionRate.GetAsFloat() + r = "aConfig.DDLPartitionRate case internalpb.RateType_DDLIndex: - r = Params.QuotaConfig.MaxIndexRate.GetAsFloat() + r = "aConfig.MaxIndexRate case internalpb.RateType_DDLFlush: - r = Params.QuotaConfig.MaxFlushRate.GetAsFloat() + r = "aConfig.MaxFlushRate case internalpb.RateType_DDLCompaction: - r = Params.QuotaConfig.MaxCompactionRate.GetAsFloat() + r = "aConfig.MaxCompactionRate case internalpb.RateType_DMLInsert: - r = Params.QuotaConfig.DMLMaxInsertRate.GetAsFloat() + r = "aConfig.DMLMaxInsertRate case internalpb.RateType_DMLDelete: - r = Params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat() + r = "aConfig.DMLMaxDeleteRate case internalpb.RateType_DMLBulkLoad: - r = Params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat() + r = "aConfig.DMLMaxBulkLoadRate case internalpb.RateType_DQLSearch: - r = Params.QuotaConfig.DQLMaxSearchRate.GetAsFloat() + r = "aConfig.DQLMaxSearchRate case internalpb.RateType_DQLQuery: - r = Params.QuotaConfig.DQLMaxQueryRate.GetAsFloat() + r = "aConfig.DQLMaxQueryRate } - limit := ratelimitutil.Limit(r) - burst := r // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant. - rl.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(limit, burst) + limit := ratelimitutil.Limit(r.GetAsFloat()) + burst := r.GetAsFloat() // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant. + rl.limiters.InsertIfNotPresent(internalpb.RateType(rt), ratelimitutil.NewLimiter(limit, burst)) + onEvent := func(rateType internalpb.RateType) func(*config.Event) { + return func(event *config.Event) { + f, err := strconv.ParseFloat(event.Value, 64) + if err != nil { + log.Info("Error format for rateLimit", + zap.String("rateType", rateType.String()), + zap.String("key", event.Key), + zap.String("value", event.Value), + zap.Error(err)) + return + } + limit, ok := rl.limiters.Get(rateType) + if !ok { + return + } + limit.SetLimit(ratelimitutil.Limit(f)) + } + }(internalpb.RateType(rt)) + paramtable.Get().Watch(r.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent)) log.Info("RateLimiter register for rateType", zap.String("rateType", internalpb.RateType_name[rt]), - zap.String("rate", ratelimitutil.Limit(r).String()), + zap.String("rate", ratelimitutil.Limit(r.GetAsFloat()).String()), zap.String("burst", fmt.Sprintf("%v", burst))) } } diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index 2c1e310821..538dbbad14 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -17,12 +17,16 @@ package proxy import ( + "context" "fmt" "math" + "math/rand" "testing" + "time" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/ratelimitutil" "github.com/stretchr/testify/assert" @@ -34,7 +38,7 @@ func TestMultiRateLimiter(t *testing.T) { paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") multiLimiter := NewMultiRateLimiter() for _, rt := range internalpb.RateType_value { - multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) + multiLimiter.globalRateLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)) } for _, rt := range internalpb.RateType_value { errCode := multiLimiter.Check(internalpb.RateType(rt), 1) @@ -81,9 +85,8 @@ func TestMultiRateLimiter(t *testing.T) { func TestRateLimiter(t *testing.T) { t.Run("test limit", func(t *testing.T) { limiter := newRateLimiter() - limiter.registerLimiters() for _, rt := range internalpb.RateType_value { - limiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) + limiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)) } for _, rt := range internalpb.RateType_value { ok, _ := limiter.limit(internalpb.RateType(rt), 1) @@ -98,7 +101,7 @@ func TestRateLimiter(t *testing.T) { t.Run("test setRates", func(t *testing.T) { limiter := newRateLimiter() for _, rt := range internalpb.RateType_value { - limiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) + limiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)) } zeroRates := make([]*internalpb.Rate, 0, len(internalpb.RateType_value)) @@ -116,4 +119,31 @@ func TestRateLimiter(t *testing.T) { } } }) + + t.Run("tests refresh rate by config", func(t *testing.T) { + limiter := newRateLimiter() + + etcdCli, _ := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + newRate := fmt.Sprintf("%.4f", rand.Float64()) + etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate) + etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid") + + assert.Eventually(t, func() bool { + limit, _ := limiter.limiters.Get(internalpb.RateType_DDLCollection) + return newRate == limit.Limit().String() + }, 20*time.Second, time.Second) + + limit, _ := limiter.limiters.Get(internalpb.RateType_DDLPartition) + assert.Equal(t, "+inf", limit.Limit().String()) + }) } diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 263256482e..45ab2811e7 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -97,7 +97,7 @@ func (gp *BaseTable) init(refreshInterval int) { } gp.initConfigsFromLocal(refreshInterval) gp.initConfigsFromRemote(refreshInterval) - gp.InitLogCfg() + gp.initLog() } func (gp *BaseTable) initConfigsFromLocal(refreshInterval int) { @@ -203,8 +203,8 @@ func (gp *BaseTable) Reset(key string) error { return nil } -// InitLogCfg init log of the base table -func (gp *BaseTable) InitLogCfg() { +// initLog init log of the base table +func (gp *BaseTable) initLog() { gp.Log = log.Config{} format := gp.GetWithDefault("log.format", DefaultLogFormat) gp.Log.Format = format diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 0388305513..6644718ffa 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -143,6 +143,10 @@ func (p *ComponentParam) GetAll() map[string]string { return p.mgr.GetConfigs() } +func (p *ComponentParam) Watch(key string, watcher config.EventHandler) { + p.mgr.Dispatcher.Register(key, watcher) +} + // ///////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct { diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index 8be4896f79..47f1cabf88 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -48,33 +48,33 @@ type quotaConfig struct { // ddl DDLLimitEnabled ParamItem `refreshable:"true"` - DDLCollectionRate ParamItem `refreshable:"false"` - DDLPartitionRate ParamItem `refreshable:"false"` + DDLCollectionRate ParamItem `refreshable:"true"` + DDLPartitionRate ParamItem `refreshable:"true"` IndexLimitEnabled ParamItem `refreshable:"true"` - MaxIndexRate ParamItem `refreshable:"false"` + MaxIndexRate ParamItem `refreshable:"true"` FlushLimitEnabled ParamItem `refreshable:"true"` - MaxFlushRate ParamItem `refreshable:"false"` + MaxFlushRate ParamItem `refreshable:"true"` CompactionLimitEnabled ParamItem `refreshable:"true"` - MaxCompactionRate ParamItem `refreshable:"false"` + MaxCompactionRate ParamItem `refreshable:"true"` // dml DMLLimitEnabled ParamItem `refreshable:"true"` - DMLMaxInsertRate ParamItem `refreshable:"false"` - DMLMinInsertRate ParamItem `refreshable:"false"` - DMLMaxDeleteRate ParamItem `refreshable:"false"` - DMLMinDeleteRate ParamItem `refreshable:"false"` - DMLMaxBulkLoadRate ParamItem `refreshable:"false"` - DMLMinBulkLoadRate ParamItem `refreshable:"false"` + DMLMaxInsertRate ParamItem `refreshable:"true"` + DMLMinInsertRate ParamItem `refreshable:"true"` + DMLMaxDeleteRate ParamItem `refreshable:"true"` + DMLMinDeleteRate ParamItem `refreshable:"true"` + DMLMaxBulkLoadRate ParamItem `refreshable:"true"` + DMLMinBulkLoadRate ParamItem `refreshable:"true"` // dql DQLLimitEnabled ParamItem `refreshable:"true"` - DQLMaxSearchRate ParamItem `refreshable:"false"` - DQLMinSearchRate ParamItem `refreshable:"false"` - DQLMaxQueryRate ParamItem `refreshable:"false"` - DQLMinQueryRate ParamItem `refreshable:"false"` + DQLMaxSearchRate ParamItem `refreshable:"true"` + DQLMinSearchRate ParamItem `refreshable:"true"` + DQLMaxQueryRate ParamItem `refreshable:"true"` + DQLMinQueryRate ParamItem `refreshable:"true"` // limits MaxCollectionNum ParamItem `refreshable:"true"` diff --git a/internal/util/typeutil/map.go b/internal/util/typeutil/map.go index 49c67de488..41a839e222 100644 --- a/internal/util/typeutil/map.go +++ b/internal/util/typeutil/map.go @@ -58,6 +58,16 @@ func (m *ConcurrentMap[K, V]) InsertIfNotPresent(key K, value V) { } } +// Insert inserts the key-value pair to the concurrent map +func (m *ConcurrentMap[K, V]) Insert(key K, value V) { + _, loaded := m.inner.LoadOrStore(key, value) + if !loaded { + m.len.Inc() + } else { + m.inner.Store(key, value) + } +} + func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) { var zeroValue V value, ok := m.inner.Load(key)