mirror of https://github.com/milvus-io/milvus.git
fix: paramtable cache cause dynamic config non-dynamic (#33473)
relate: https://github.com/milvus-io/milvus/issues/33461 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/33599/head
parent
ac5e098e13
commit
2422084a29
|
@ -411,7 +411,7 @@ func (mr *MilvusRoles) Run() {
|
|||
}
|
||||
|
||||
tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
|
||||
log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
|
||||
log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()), zap.Float64("SampleFraction", params.TraceCfg.SampleFraction.GetAsFloat()))
|
||||
|
||||
if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole {
|
||||
initcore.InitTraceConfig(params)
|
||||
|
|
|
@ -78,6 +78,9 @@ func (es EnvSource) GetSourceName() string {
|
|||
return "EnvironmentSource"
|
||||
}
|
||||
|
||||
func (es EnvSource) SetManager(m ConfigManager) {
|
||||
}
|
||||
|
||||
func (es EnvSource) SetEventHandler(eh EventHandler) {
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -44,6 +45,7 @@ type EtcdSource struct {
|
|||
|
||||
updateMu sync.Mutex
|
||||
configRefresher *refresher
|
||||
manager ConfigManager
|
||||
}
|
||||
|
||||
func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) {
|
||||
|
@ -115,6 +117,12 @@ func (es *EtcdSource) Close() {
|
|||
es.configRefresher.stop()
|
||||
}
|
||||
|
||||
func (es *EtcdSource) SetManager(m ConfigManager) {
|
||||
es.Lock()
|
||||
defer es.Unlock()
|
||||
es.manager = m
|
||||
}
|
||||
|
||||
func (es *EtcdSource) SetEventHandler(eh EventHandler) {
|
||||
es.configRefresher.SetEventHandler(eh)
|
||||
}
|
||||
|
@ -172,6 +180,9 @@ func (es *EtcdSource) update(configs map[string]string) error {
|
|||
return err
|
||||
}
|
||||
es.currentConfigs = configs
|
||||
if es.manager != nil {
|
||||
es.manager.EvictCacheValueByFormat(lo.Map(events, func(event *Event, _ int) string { return event.Key })...)
|
||||
}
|
||||
es.Unlock()
|
||||
|
||||
es.configRefresher.fireEvents(events...)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
|
@ -36,6 +37,7 @@ type FileSource struct {
|
|||
|
||||
updateMu sync.Mutex
|
||||
configRefresher *refresher
|
||||
manager ConfigManager
|
||||
}
|
||||
|
||||
func NewFileSource(fileInfo *FileInfo) *FileSource {
|
||||
|
@ -91,6 +93,12 @@ func (fs *FileSource) Close() {
|
|||
fs.configRefresher.stop()
|
||||
}
|
||||
|
||||
func (fs *FileSource) SetManager(m ConfigManager) {
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
fs.manager = m
|
||||
}
|
||||
|
||||
func (fs *FileSource) SetEventHandler(eh EventHandler) {
|
||||
fs.RWMutex.Lock()
|
||||
defer fs.RWMutex.Unlock()
|
||||
|
@ -173,6 +181,9 @@ func (fs *FileSource) update(configs map[string]string) error {
|
|||
return err
|
||||
}
|
||||
fs.configs = configs
|
||||
if fs.manager != nil {
|
||||
fs.manager.EvictCacheValueByFormat(lo.Map(events, func(event *Event, _ int) string { return event.Key })...)
|
||||
}
|
||||
fs.Unlock()
|
||||
|
||||
fs.configRefresher.fireEvents(events...)
|
||||
|
|
|
@ -116,6 +116,16 @@ func (m *Manager) EvictCachedValue(key string) {
|
|||
m.configCache.Remove(key)
|
||||
}
|
||||
|
||||
func (m *Manager) EvictCacheValueByFormat(keys ...string) {
|
||||
set := typeutil.NewSet(keys...)
|
||||
m.configCache.Range(func(key string, value interface{}) bool {
|
||||
if set.Contain(formatKey(key)) {
|
||||
m.configCache.Remove(key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) GetConfig(key string) (string, error) {
|
||||
realKey := formatKey(key)
|
||||
v, ok := m.overlays.Get(realKey)
|
||||
|
@ -210,6 +220,7 @@ func (m *Manager) AddSource(source Source) error {
|
|||
return err
|
||||
}
|
||||
|
||||
source.SetManager(m)
|
||||
m.sources.Insert(sourceName, source)
|
||||
|
||||
err := m.pullSourceConfigs(sourceName)
|
||||
|
|
|
@ -270,6 +270,9 @@ func (ErrSource) GetPriority() int {
|
|||
return 2
|
||||
}
|
||||
|
||||
func (ErrSource) SetManager(m ConfigManager) {
|
||||
}
|
||||
|
||||
// GetSourceName implements Source
|
||||
func (ErrSource) GetSourceName() string {
|
||||
return "ErrSource"
|
||||
|
|
|
@ -23,12 +23,17 @@ const (
|
|||
LowPriority = NormalPriority + 10
|
||||
)
|
||||
|
||||
type ConfigManager interface {
|
||||
EvictCacheValueByFormat(keys ...string)
|
||||
}
|
||||
|
||||
type Source interface {
|
||||
GetConfigurations() (map[string]string, error)
|
||||
GetConfigurationByKey(string) (string, error)
|
||||
GetPriority() int
|
||||
GetSourceName() string
|
||||
SetEventHandler(eh EventHandler)
|
||||
SetManager(m ConfigManager)
|
||||
UpdateOptions(opt Options)
|
||||
Close()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue