mirror of https://github.com/milvus-io/milvus.git
fix: Remove watching config event in rate limiter (#32313)
Remove watching config event in rate limiter to prevent object leaks. Instead, reset limiter rates periodically. issue: https://github.com/milvus-io/milvus/issues/32312 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/32536/head
parent
16b8b7b35d
commit
3119a7b23f
|
@ -30,7 +30,6 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||||
"github.com/milvus-io/milvus/internal/util/quota"
|
"github.com/milvus-io/milvus/internal/util/quota"
|
||||||
rlinternal "github.com/milvus-io/milvus/internal/util/ratelimitutil"
|
rlinternal "github.com/milvus-io/milvus/internal/util/ratelimitutil"
|
||||||
"github.com/milvus-io/milvus/pkg/config"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
@ -168,6 +167,28 @@ func (m *SimpleLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) {
|
||||||
func (m *SimpleLimiter) SetRates(rootLimiter *proxypb.LimiterNode) error {
|
func (m *SimpleLimiter) SetRates(rootLimiter *proxypb.LimiterNode) error {
|
||||||
m.quotaStatesMu.Lock()
|
m.quotaStatesMu.Lock()
|
||||||
defer m.quotaStatesMu.Unlock()
|
defer m.quotaStatesMu.Unlock()
|
||||||
|
|
||||||
|
// Reset the limiter rates due to potential changes in configurations.
|
||||||
|
var (
|
||||||
|
clusterConfigs = getDefaultLimiterConfig(internalpb.RateScope_Cluster)
|
||||||
|
databaseConfigs = getDefaultLimiterConfig(internalpb.RateScope_Database)
|
||||||
|
collectionConfigs = getDefaultLimiterConfig(internalpb.RateScope_Collection)
|
||||||
|
partitionConfigs = getDefaultLimiterConfig(internalpb.RateScope_Partition)
|
||||||
|
)
|
||||||
|
initLimiter(m.rateLimiter.GetRootLimiters(), clusterConfigs)
|
||||||
|
m.rateLimiter.GetRootLimiters().GetChildren().Range(func(_ int64, dbLimiter *rlinternal.RateLimiterNode) bool {
|
||||||
|
initLimiter(dbLimiter, databaseConfigs)
|
||||||
|
dbLimiter.GetChildren().Range(func(_ int64, collLimiter *rlinternal.RateLimiterNode) bool {
|
||||||
|
initLimiter(collLimiter, collectionConfigs)
|
||||||
|
collLimiter.GetChildren().Range(func(_ int64, partitionLimiter *rlinternal.RateLimiterNode) bool {
|
||||||
|
initLimiter(partitionLimiter, partitionConfigs)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
if err := m.updateRateLimiter(rootLimiter); err != nil {
|
if err := m.updateRateLimiter(rootLimiter); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -182,26 +203,6 @@ func initLimiter(rln *rlinternal.RateLimiterNode, rateLimiterConfigs map[interna
|
||||||
limit := ratelimitutil.Limit(p.GetAsFloat())
|
limit := ratelimitutil.Limit(p.GetAsFloat())
|
||||||
burst := p.GetAsFloat() // use rate as burst, because SimpleLimiter is with punishment mechanism, burst is insignificant.
|
burst := p.GetAsFloat() // use rate as burst, because SimpleLimiter is with punishment mechanism, burst is insignificant.
|
||||||
rln.GetLimiters().GetOrInsert(rt, ratelimitutil.NewLimiter(limit, burst))
|
rln.GetLimiters().GetOrInsert(rt, ratelimitutil.NewLimiter(limit, burst))
|
||||||
onEvent := func(rateType internalpb.RateType, formatFunc func(originValue string) string) func(*config.Event) {
|
|
||||||
return func(event *config.Event) {
|
|
||||||
f, err := strconv.ParseFloat(formatFunc(event.Value), 64)
|
|
||||||
if err != nil {
|
|
||||||
log.Info("Error format for rateLimit",
|
|
||||||
zap.String("rateType", rateType.String()),
|
|
||||||
zap.String("key", event.Key),
|
|
||||||
zap.String("value", event.Value),
|
|
||||||
zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
l, ok := rln.GetLimiters().Get(rateType)
|
|
||||||
if !ok {
|
|
||||||
log.Info("rateLimiter not found for rateType", zap.String("rateType", rateType.String()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
l.SetLimit(ratelimitutil.Limit(f))
|
|
||||||
}
|
|
||||||
}(rt, p.Formatter)
|
|
||||||
paramtable.Get().Watch(p.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent))
|
|
||||||
log.RatedDebug(30, "RateLimiter register for rateType",
|
log.RatedDebug(30, "RateLimiter register for rateType",
|
||||||
zap.String("rateType", internalpb.RateType_name[(int32(rt))]),
|
zap.String("rateType", internalpb.RateType_name[(int32(rt))]),
|
||||||
zap.String("rateLimit", ratelimitutil.Limit(p.GetAsFloat()).String()),
|
zap.String("rateLimit", ratelimitutil.Limit(p.GetAsFloat()).String()),
|
||||||
|
@ -235,8 +236,8 @@ func newCollectionLimiters() *rlinternal.RateLimiterNode {
|
||||||
|
|
||||||
func newPartitionLimiters() *rlinternal.RateLimiterNode {
|
func newPartitionLimiters() *rlinternal.RateLimiterNode {
|
||||||
partRateLimiters := rlinternal.NewRateLimiterNode(internalpb.RateScope_Partition)
|
partRateLimiters := rlinternal.NewRateLimiterNode(internalpb.RateScope_Partition)
|
||||||
collectionLimiterConfigs := getDefaultLimiterConfig(internalpb.RateScope_Partition)
|
partitionLimiterConfigs := getDefaultLimiterConfig(internalpb.RateScope_Partition)
|
||||||
initLimiter(partRateLimiters, collectionLimiterConfigs)
|
initLimiter(partRateLimiters, partitionLimiterConfigs)
|
||||||
return partRateLimiters
|
return partRateLimiters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue