diff --git a/internal/proxy/simple_rate_limiter.go b/internal/proxy/simple_rate_limiter.go index 97fd8c9c2a..120bf36036 100644 --- a/internal/proxy/simple_rate_limiter.go +++ b/internal/proxy/simple_rate_limiter.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/util/quota" rlinternal "github.com/milvus-io/milvus/internal/util/ratelimitutil" - "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -168,6 +167,28 @@ func (m *SimpleLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) { func (m *SimpleLimiter) SetRates(rootLimiter *proxypb.LimiterNode) error { m.quotaStatesMu.Lock() defer m.quotaStatesMu.Unlock() + + // Reset the limiter rates due to potential changes in configurations. + var ( + clusterConfigs = getDefaultLimiterConfig(internalpb.RateScope_Cluster) + databaseConfigs = getDefaultLimiterConfig(internalpb.RateScope_Database) + collectionConfigs = getDefaultLimiterConfig(internalpb.RateScope_Collection) + partitionConfigs = getDefaultLimiterConfig(internalpb.RateScope_Partition) + ) + initLimiter(m.rateLimiter.GetRootLimiters(), clusterConfigs) + m.rateLimiter.GetRootLimiters().GetChildren().Range(func(_ int64, dbLimiter *rlinternal.RateLimiterNode) bool { + initLimiter(dbLimiter, databaseConfigs) + dbLimiter.GetChildren().Range(func(_ int64, collLimiter *rlinternal.RateLimiterNode) bool { + initLimiter(collLimiter, collectionConfigs) + collLimiter.GetChildren().Range(func(_ int64, partitionLimiter *rlinternal.RateLimiterNode) bool { + initLimiter(partitionLimiter, partitionConfigs) + return true + }) + return true + }) + return true + }) + if err := m.updateRateLimiter(rootLimiter); err != nil { return err } @@ -182,26 +203,6 @@ func initLimiter(rln *rlinternal.RateLimiterNode, rateLimiterConfigs map[interna limit := ratelimitutil.Limit(p.GetAsFloat()) burst := p.GetAsFloat() // use rate as burst, because SimpleLimiter is with punishment mechanism, burst is insignificant. rln.GetLimiters().GetOrInsert(rt, ratelimitutil.NewLimiter(limit, burst)) - onEvent := func(rateType internalpb.RateType, formatFunc func(originValue string) string) func(*config.Event) { - return func(event *config.Event) { - f, err := strconv.ParseFloat(formatFunc(event.Value), 64) - if err != nil { - log.Info("Error format for rateLimit", - zap.String("rateType", rateType.String()), - zap.String("key", event.Key), - zap.String("value", event.Value), - zap.Error(err)) - return - } - l, ok := rln.GetLimiters().Get(rateType) - if !ok { - log.Info("rateLimiter not found for rateType", zap.String("rateType", rateType.String())) - return - } - l.SetLimit(ratelimitutil.Limit(f)) - } - }(rt, p.Formatter) - paramtable.Get().Watch(p.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent)) log.RatedDebug(30, "RateLimiter register for rateType", zap.String("rateType", internalpb.RateType_name[(int32(rt))]), zap.String("rateLimit", ratelimitutil.Limit(p.GetAsFloat()).String()), @@ -235,8 +236,8 @@ func newCollectionLimiters() *rlinternal.RateLimiterNode { func newPartitionLimiters() *rlinternal.RateLimiterNode { partRateLimiters := rlinternal.NewRateLimiterNode(internalpb.RateScope_Partition) - collectionLimiterConfigs := getDefaultLimiterConfig(internalpb.RateScope_Partition) - initLimiter(partRateLimiters, collectionLimiterConfigs) + partitionLimiterConfigs := getDefaultLimiterConfig(internalpb.RateScope_Partition) + initLimiter(partRateLimiters, partitionLimiterConfigs) return partRateLimiters }