From 2422084a29410a006018d5159054143ba99a28c8 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Tue, 4 Jun 2024 11:39:46 +0800 Subject: [PATCH] fix: paramtable cache cause dynamic config non-dynamic (#33473) relate: https://github.com/milvus-io/milvus/issues/33461 Signed-off-by: aoiasd --- cmd/roles/roles.go | 2 +- pkg/config/env_source.go | 3 +++ pkg/config/etcd_source.go | 11 +++++++++++ pkg/config/file_source.go | 11 +++++++++++ pkg/config/manager.go | 11 +++++++++++ pkg/config/manager_test.go | 3 +++ pkg/config/source.go | 5 +++++ 7 files changed, 45 insertions(+), 1 deletion(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 1105498f5f..16be7ac378 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -411,7 +411,7 @@ func (mr *MilvusRoles) Run() { } tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat()) - log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue())) + log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()), zap.Float64("SampleFraction", params.TraceCfg.SampleFraction.GetAsFloat())) if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole { initcore.InitTraceConfig(params) diff --git a/pkg/config/env_source.go b/pkg/config/env_source.go index abef8bb821..b36ee5917b 100644 --- a/pkg/config/env_source.go +++ b/pkg/config/env_source.go @@ -78,6 +78,9 @@ func (es EnvSource) GetSourceName() string { return "EnvironmentSource" } +func (es EnvSource) SetManager(m ConfigManager) { +} + func (es EnvSource) SetEventHandler(eh EventHandler) { } diff --git a/pkg/config/etcd_source.go b/pkg/config/etcd_source.go index 9c87d0fc1c..29f49278d7 100644 --- a/pkg/config/etcd_source.go +++ b/pkg/config/etcd_source.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -44,6 +45,7 @@ type EtcdSource struct { updateMu sync.Mutex configRefresher *refresher + manager ConfigManager } func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) { @@ -115,6 +117,12 @@ func (es *EtcdSource) Close() { es.configRefresher.stop() } +func (es *EtcdSource) SetManager(m ConfigManager) { + es.Lock() + defer es.Unlock() + es.manager = m +} + func (es *EtcdSource) SetEventHandler(eh EventHandler) { es.configRefresher.SetEventHandler(eh) } @@ -172,6 +180,9 @@ func (es *EtcdSource) update(configs map[string]string) error { return err } es.currentConfigs = configs + if es.manager != nil { + es.manager.EvictCacheValueByFormat(lo.Map(events, func(event *Event, _ int) string { return event.Key })...) + } es.Unlock() es.configRefresher.fireEvents(events...) diff --git a/pkg/config/file_source.go b/pkg/config/file_source.go index 6c1ba11bf1..9a1ab3f863 100644 --- a/pkg/config/file_source.go +++ b/pkg/config/file_source.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/spf13/cast" "github.com/spf13/viper" "go.uber.org/zap" @@ -36,6 +37,7 @@ type FileSource struct { updateMu sync.Mutex configRefresher *refresher + manager ConfigManager } func NewFileSource(fileInfo *FileInfo) *FileSource { @@ -91,6 +93,12 @@ func (fs *FileSource) Close() { fs.configRefresher.stop() } +func (fs *FileSource) SetManager(m ConfigManager) { + fs.Lock() + defer fs.Unlock() + fs.manager = m +} + func (fs *FileSource) SetEventHandler(eh EventHandler) { fs.RWMutex.Lock() defer fs.RWMutex.Unlock() @@ -173,6 +181,9 @@ func (fs *FileSource) update(configs map[string]string) error { return err } fs.configs = configs + if fs.manager != nil { + fs.manager.EvictCacheValueByFormat(lo.Map(events, func(event *Event, _ int) string { return event.Key })...) + } fs.Unlock() fs.configRefresher.fireEvents(events...) diff --git a/pkg/config/manager.go b/pkg/config/manager.go index b33993296b..7e8c100255 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -116,6 +116,16 @@ func (m *Manager) EvictCachedValue(key string) { m.configCache.Remove(key) } +func (m *Manager) EvictCacheValueByFormat(keys ...string) { + set := typeutil.NewSet(keys...) + m.configCache.Range(func(key string, value interface{}) bool { + if set.Contain(formatKey(key)) { + m.configCache.Remove(key) + } + return true + }) +} + func (m *Manager) GetConfig(key string) (string, error) { realKey := formatKey(key) v, ok := m.overlays.Get(realKey) @@ -210,6 +220,7 @@ func (m *Manager) AddSource(source Source) error { return err } + source.SetManager(m) m.sources.Insert(sourceName, source) err := m.pullSourceConfigs(sourceName) diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index ef4c2290ab..b955071661 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -270,6 +270,9 @@ func (ErrSource) GetPriority() int { return 2 } +func (ErrSource) SetManager(m ConfigManager) { +} + // GetSourceName implements Source func (ErrSource) GetSourceName() string { return "ErrSource" diff --git a/pkg/config/source.go b/pkg/config/source.go index 6a2cfbae04..61a22e320f 100644 --- a/pkg/config/source.go +++ b/pkg/config/source.go @@ -23,12 +23,17 @@ const ( LowPriority = NormalPriority + 10 ) +type ConfigManager interface { + EvictCacheValueByFormat(keys ...string) +} + type Source interface { GetConfigurations() (map[string]string, error) GetConfigurationByKey(string) (string, error) GetPriority() int GetSourceName() string SetEventHandler(eh EventHandler) + SetManager(m ConfigManager) UpdateOptions(opt Options) Close() }