mirror of https://github.com/milvus-io/milvus.git
Make ratelimiter's config refreshable (#21757)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/21874/head
parent
b12740e108
commit
53ae40b8c6
|
@ -32,12 +32,6 @@ type Source interface {
|
|||
Close()
|
||||
}
|
||||
|
||||
// EventHandler handles config change event
|
||||
type EventHandler interface {
|
||||
OnEvent(event *Event)
|
||||
GetIdentifier() string
|
||||
}
|
||||
|
||||
// EtcdInfo has attribute for config center source initialization
|
||||
type EtcdInfo struct {
|
||||
UseEmbed bool
|
||||
|
@ -90,3 +84,27 @@ func WithEnvSource(keyFormatter func(string) string) Option {
|
|||
options.EnvKeyFormatter = keyFormatter
|
||||
}
|
||||
}
|
||||
|
||||
// EventHandler handles config change event
|
||||
type EventHandler interface {
|
||||
OnEvent(event *Event)
|
||||
GetIdentifier() string
|
||||
}
|
||||
|
||||
type simpleHandler struct {
|
||||
identity string
|
||||
onEvent func(*Event)
|
||||
}
|
||||
|
||||
func (s *simpleHandler) GetIdentifier() string {
|
||||
return s.identity
|
||||
}
|
||||
|
||||
// OnEvent implements EventHandler
|
||||
func (s *simpleHandler) OnEvent(event *Event) {
|
||||
s.onEvent(event)
|
||||
}
|
||||
|
||||
func NewHandler(ident string, onEvent func(*Event)) EventHandler {
|
||||
return &simpleHandler{ident, onEvent}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package proxy
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -25,11 +26,13 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/config"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
var QuotaErrorString = map[commonpb.ErrorCode]string{
|
||||
|
@ -113,13 +116,13 @@ func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, codes []
|
|||
|
||||
// rateLimiter implements Limiter.
|
||||
type rateLimiter struct {
|
||||
limiters map[internalpb.RateType]*ratelimitutil.Limiter
|
||||
limiters *typeutil.ConcurrentMap[internalpb.RateType, *ratelimitutil.Limiter]
|
||||
}
|
||||
|
||||
// newRateLimiter returns a new RateLimiter.
|
||||
func newRateLimiter() *rateLimiter {
|
||||
rl := &rateLimiter{
|
||||
limiters: make(map[internalpb.RateType]*ratelimitutil.Limiter),
|
||||
limiters: typeutil.NewConcurrentMap[internalpb.RateType, *ratelimitutil.Limiter](),
|
||||
}
|
||||
rl.registerLimiters()
|
||||
return rl
|
||||
|
@ -128,14 +131,18 @@ func newRateLimiter() *rateLimiter {
|
|||
// limit returns true, the request will be rejected.
|
||||
// Otherwise, the request will pass.
|
||||
func (rl *rateLimiter) limit(rt internalpb.RateType, n int) (bool, float64) {
|
||||
return !rl.limiters[rt].AllowN(time.Now(), n), float64(rl.limiters[rt].Limit())
|
||||
limit, ok := rl.limiters.Get(rt)
|
||||
if !ok {
|
||||
return false, -1
|
||||
}
|
||||
return !limit.AllowN(time.Now(), n), float64(limit.Limit())
|
||||
}
|
||||
|
||||
// setRates sets new rates for the limiters.
|
||||
func (rl *rateLimiter) setRates(rates []*internalpb.Rate) error {
|
||||
for _, r := range rates {
|
||||
if _, ok := rl.limiters[r.GetRt()]; ok {
|
||||
rl.limiters[r.GetRt()].SetLimit(ratelimitutil.Limit(r.GetR()))
|
||||
if limit, ok := rl.limiters.Get(r.GetRt()); ok {
|
||||
limit.SetLimit(ratelimitutil.Limit(r.GetR()))
|
||||
metrics.SetRateGaugeByRateType(r.GetRt(), paramtable.GetNodeID(), r.GetR())
|
||||
} else {
|
||||
return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String())
|
||||
|
@ -157,36 +164,56 @@ func (rl *rateLimiter) printRates(rates []*internalpb.Rate) {
|
|||
|
||||
// registerLimiters register limiter for all rate types.
|
||||
func (rl *rateLimiter) registerLimiters() {
|
||||
quotaConfig := &Params.QuotaConfig
|
||||
for rt := range internalpb.RateType_name {
|
||||
var r float64
|
||||
var r *paramtable.ParamItem
|
||||
switch internalpb.RateType(rt) {
|
||||
case internalpb.RateType_DDLCollection:
|
||||
r = Params.QuotaConfig.DDLCollectionRate.GetAsFloat()
|
||||
r = "aConfig.DDLCollectionRate
|
||||
case internalpb.RateType_DDLPartition:
|
||||
r = Params.QuotaConfig.DDLPartitionRate.GetAsFloat()
|
||||
r = "aConfig.DDLPartitionRate
|
||||
case internalpb.RateType_DDLIndex:
|
||||
r = Params.QuotaConfig.MaxIndexRate.GetAsFloat()
|
||||
r = "aConfig.MaxIndexRate
|
||||
case internalpb.RateType_DDLFlush:
|
||||
r = Params.QuotaConfig.MaxFlushRate.GetAsFloat()
|
||||
r = "aConfig.MaxFlushRate
|
||||
case internalpb.RateType_DDLCompaction:
|
||||
r = Params.QuotaConfig.MaxCompactionRate.GetAsFloat()
|
||||
r = "aConfig.MaxCompactionRate
|
||||
case internalpb.RateType_DMLInsert:
|
||||
r = Params.QuotaConfig.DMLMaxInsertRate.GetAsFloat()
|
||||
r = "aConfig.DMLMaxInsertRate
|
||||
case internalpb.RateType_DMLDelete:
|
||||
r = Params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat()
|
||||
r = "aConfig.DMLMaxDeleteRate
|
||||
case internalpb.RateType_DMLBulkLoad:
|
||||
r = Params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat()
|
||||
r = "aConfig.DMLMaxBulkLoadRate
|
||||
case internalpb.RateType_DQLSearch:
|
||||
r = Params.QuotaConfig.DQLMaxSearchRate.GetAsFloat()
|
||||
r = "aConfig.DQLMaxSearchRate
|
||||
case internalpb.RateType_DQLQuery:
|
||||
r = Params.QuotaConfig.DQLMaxQueryRate.GetAsFloat()
|
||||
r = "aConfig.DQLMaxQueryRate
|
||||
}
|
||||
limit := ratelimitutil.Limit(r)
|
||||
burst := r // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
|
||||
rl.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(limit, burst)
|
||||
limit := ratelimitutil.Limit(r.GetAsFloat())
|
||||
burst := r.GetAsFloat() // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
|
||||
rl.limiters.InsertIfNotPresent(internalpb.RateType(rt), ratelimitutil.NewLimiter(limit, burst))
|
||||
onEvent := func(rateType internalpb.RateType) func(*config.Event) {
|
||||
return func(event *config.Event) {
|
||||
f, err := strconv.ParseFloat(event.Value, 64)
|
||||
if err != nil {
|
||||
log.Info("Error format for rateLimit",
|
||||
zap.String("rateType", rateType.String()),
|
||||
zap.String("key", event.Key),
|
||||
zap.String("value", event.Value),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
limit, ok := rl.limiters.Get(rateType)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
limit.SetLimit(ratelimitutil.Limit(f))
|
||||
}
|
||||
}(internalpb.RateType(rt))
|
||||
paramtable.Get().Watch(r.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent))
|
||||
log.Info("RateLimiter register for rateType",
|
||||
zap.String("rateType", internalpb.RateType_name[rt]),
|
||||
zap.String("rate", ratelimitutil.Limit(r).String()),
|
||||
zap.String("rate", ratelimitutil.Limit(r.GetAsFloat()).String()),
|
||||
zap.String("burst", fmt.Sprintf("%v", burst)))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,12 +17,16 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -34,7 +38,7 @@ func TestMultiRateLimiter(t *testing.T) {
|
|||
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
|
||||
multiLimiter := NewMultiRateLimiter()
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
|
||||
multiLimiter.globalRateLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
|
||||
}
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
errCode := multiLimiter.Check(internalpb.RateType(rt), 1)
|
||||
|
@ -81,9 +85,8 @@ func TestMultiRateLimiter(t *testing.T) {
|
|||
func TestRateLimiter(t *testing.T) {
|
||||
t.Run("test limit", func(t *testing.T) {
|
||||
limiter := newRateLimiter()
|
||||
limiter.registerLimiters()
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
limiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
|
||||
limiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
|
||||
}
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
ok, _ := limiter.limit(internalpb.RateType(rt), 1)
|
||||
|
@ -98,7 +101,7 @@ func TestRateLimiter(t *testing.T) {
|
|||
t.Run("test setRates", func(t *testing.T) {
|
||||
limiter := newRateLimiter()
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
limiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
|
||||
limiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
|
||||
}
|
||||
|
||||
zeroRates := make([]*internalpb.Rate, 0, len(internalpb.RateType_value))
|
||||
|
@ -116,4 +119,31 @@ func TestRateLimiter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("tests refresh rate by config", func(t *testing.T) {
|
||||
limiter := newRateLimiter()
|
||||
|
||||
etcdCli, _ := etcd.GetEtcdClient(
|
||||
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||
Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||
Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
newRate := fmt.Sprintf("%.4f", rand.Float64())
|
||||
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate)
|
||||
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid")
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
limit, _ := limiter.limiters.Get(internalpb.RateType_DDLCollection)
|
||||
return newRate == limit.Limit().String()
|
||||
}, 20*time.Second, time.Second)
|
||||
|
||||
limit, _ := limiter.limiters.Get(internalpb.RateType_DDLPartition)
|
||||
assert.Equal(t, "+inf", limit.Limit().String())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ func (gp *BaseTable) init(refreshInterval int) {
|
|||
}
|
||||
gp.initConfigsFromLocal(refreshInterval)
|
||||
gp.initConfigsFromRemote(refreshInterval)
|
||||
gp.InitLogCfg()
|
||||
gp.initLog()
|
||||
}
|
||||
|
||||
func (gp *BaseTable) initConfigsFromLocal(refreshInterval int) {
|
||||
|
@ -203,8 +203,8 @@ func (gp *BaseTable) Reset(key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitLogCfg init log of the base table
|
||||
func (gp *BaseTable) InitLogCfg() {
|
||||
// initLog init log of the base table
|
||||
func (gp *BaseTable) initLog() {
|
||||
gp.Log = log.Config{}
|
||||
format := gp.GetWithDefault("log.format", DefaultLogFormat)
|
||||
gp.Log.Format = format
|
||||
|
|
|
@ -143,6 +143,10 @@ func (p *ComponentParam) GetAll() map[string]string {
|
|||
return p.mgr.GetConfigs()
|
||||
}
|
||||
|
||||
func (p *ComponentParam) Watch(key string, watcher config.EventHandler) {
|
||||
p.mgr.Dispatcher.Register(key, watcher)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
// --- common ---
|
||||
type commonConfig struct {
|
||||
|
|
|
@ -48,33 +48,33 @@ type quotaConfig struct {
|
|||
|
||||
// ddl
|
||||
DDLLimitEnabled ParamItem `refreshable:"true"`
|
||||
DDLCollectionRate ParamItem `refreshable:"false"`
|
||||
DDLPartitionRate ParamItem `refreshable:"false"`
|
||||
DDLCollectionRate ParamItem `refreshable:"true"`
|
||||
DDLPartitionRate ParamItem `refreshable:"true"`
|
||||
|
||||
IndexLimitEnabled ParamItem `refreshable:"true"`
|
||||
MaxIndexRate ParamItem `refreshable:"false"`
|
||||
MaxIndexRate ParamItem `refreshable:"true"`
|
||||
|
||||
FlushLimitEnabled ParamItem `refreshable:"true"`
|
||||
MaxFlushRate ParamItem `refreshable:"false"`
|
||||
MaxFlushRate ParamItem `refreshable:"true"`
|
||||
|
||||
CompactionLimitEnabled ParamItem `refreshable:"true"`
|
||||
MaxCompactionRate ParamItem `refreshable:"false"`
|
||||
MaxCompactionRate ParamItem `refreshable:"true"`
|
||||
|
||||
// dml
|
||||
DMLLimitEnabled ParamItem `refreshable:"true"`
|
||||
DMLMaxInsertRate ParamItem `refreshable:"false"`
|
||||
DMLMinInsertRate ParamItem `refreshable:"false"`
|
||||
DMLMaxDeleteRate ParamItem `refreshable:"false"`
|
||||
DMLMinDeleteRate ParamItem `refreshable:"false"`
|
||||
DMLMaxBulkLoadRate ParamItem `refreshable:"false"`
|
||||
DMLMinBulkLoadRate ParamItem `refreshable:"false"`
|
||||
DMLMaxInsertRate ParamItem `refreshable:"true"`
|
||||
DMLMinInsertRate ParamItem `refreshable:"true"`
|
||||
DMLMaxDeleteRate ParamItem `refreshable:"true"`
|
||||
DMLMinDeleteRate ParamItem `refreshable:"true"`
|
||||
DMLMaxBulkLoadRate ParamItem `refreshable:"true"`
|
||||
DMLMinBulkLoadRate ParamItem `refreshable:"true"`
|
||||
|
||||
// dql
|
||||
DQLLimitEnabled ParamItem `refreshable:"true"`
|
||||
DQLMaxSearchRate ParamItem `refreshable:"false"`
|
||||
DQLMinSearchRate ParamItem `refreshable:"false"`
|
||||
DQLMaxQueryRate ParamItem `refreshable:"false"`
|
||||
DQLMinQueryRate ParamItem `refreshable:"false"`
|
||||
DQLMaxSearchRate ParamItem `refreshable:"true"`
|
||||
DQLMinSearchRate ParamItem `refreshable:"true"`
|
||||
DQLMaxQueryRate ParamItem `refreshable:"true"`
|
||||
DQLMinQueryRate ParamItem `refreshable:"true"`
|
||||
|
||||
// limits
|
||||
MaxCollectionNum ParamItem `refreshable:"true"`
|
||||
|
|
|
@ -58,6 +58,16 @@ func (m *ConcurrentMap[K, V]) InsertIfNotPresent(key K, value V) {
|
|||
}
|
||||
}
|
||||
|
||||
// Insert inserts the key-value pair to the concurrent map
|
||||
func (m *ConcurrentMap[K, V]) Insert(key K, value V) {
|
||||
_, loaded := m.inner.LoadOrStore(key, value)
|
||||
if !loaded {
|
||||
m.len.Inc()
|
||||
} else {
|
||||
m.inner.Store(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
|
||||
var zeroValue V
|
||||
value, ok := m.inner.Load(key)
|
||||
|
|
Loading…
Reference in New Issue