mirror of https://github.com/milvus-io/milvus.git
enhance: adjust config source for support config event use paramtable (#29995)
Adjust config source for support config event which for dynamic config could use paramtable and not deadlock. relate: https://github.com/milvus-io/milvus/issues/29807 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/30326/head
parent
f2985d8454
commit
8385157717
|
@ -37,11 +37,12 @@ const (
|
|||
|
||||
type EtcdSource struct {
|
||||
sync.RWMutex
|
||||
etcdCli *clientv3.Client
|
||||
ctx context.Context
|
||||
currentConfig map[string]string
|
||||
keyPrefix string
|
||||
etcdCli *clientv3.Client
|
||||
ctx context.Context
|
||||
currentConfigs map[string]string
|
||||
keyPrefix string
|
||||
|
||||
updateMu sync.Mutex
|
||||
configRefresher *refresher
|
||||
}
|
||||
|
||||
|
@ -59,10 +60,10 @@ func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) {
|
|||
return nil, err
|
||||
}
|
||||
es := &EtcdSource{
|
||||
etcdCli: etcdCli,
|
||||
ctx: context.Background(),
|
||||
currentConfig: make(map[string]string),
|
||||
keyPrefix: etcdInfo.KeyPrefix,
|
||||
etcdCli: etcdCli,
|
||||
ctx: context.Background(),
|
||||
currentConfigs: make(map[string]string),
|
||||
keyPrefix: etcdInfo.KeyPrefix,
|
||||
}
|
||||
es.configRefresher = newRefresher(etcdInfo.RefreshInterval, es.refreshConfigurations)
|
||||
return es, nil
|
||||
|
@ -71,7 +72,7 @@ func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) {
|
|||
// GetConfigurationByKey implements ConfigSource
|
||||
func (es *EtcdSource) GetConfigurationByKey(key string) (string, error) {
|
||||
es.RLock()
|
||||
v, ok := es.currentConfig[key]
|
||||
v, ok := es.currentConfigs[key]
|
||||
es.RUnlock()
|
||||
if !ok {
|
||||
return "", fmt.Errorf("key not found: %s", key)
|
||||
|
@ -88,7 +89,7 @@ func (es *EtcdSource) GetConfigurations() (map[string]string, error) {
|
|||
}
|
||||
es.configRefresher.start(es.GetSourceName())
|
||||
es.RLock()
|
||||
for key, value := range es.currentConfig {
|
||||
for key, value := range es.currentConfigs {
|
||||
configMap[key] = value
|
||||
}
|
||||
es.RUnlock()
|
||||
|
@ -152,12 +153,24 @@ func (es *EtcdSource) refreshConfigurations() error {
|
|||
newConfig[formatKey(key)] = string(kv.Value)
|
||||
log.Debug("got config from etcd", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
|
||||
}
|
||||
return es.update(newConfig)
|
||||
}
|
||||
|
||||
func (es *EtcdSource) update(configs map[string]string) error {
|
||||
// make sure config not change when fire event
|
||||
es.updateMu.Lock()
|
||||
defer es.updateMu.Unlock()
|
||||
|
||||
es.Lock()
|
||||
defer es.Unlock()
|
||||
err = es.configRefresher.fireEvents(es.GetSourceName(), es.currentConfig, newConfig)
|
||||
events, err := PopulateEvents(es.GetSourceName(), es.currentConfigs, configs)
|
||||
if err != nil {
|
||||
es.Unlock()
|
||||
log.Warn("generating event error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
es.currentConfig = newConfig
|
||||
es.currentConfigs = configs
|
||||
es.Unlock()
|
||||
|
||||
es.configRefresher.fireEvents(events...)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ type FileSource struct {
|
|||
files []string
|
||||
configs map[string]string
|
||||
|
||||
updateMu sync.Mutex
|
||||
configRefresher *refresher
|
||||
}
|
||||
|
||||
|
@ -154,13 +155,26 @@ func (fs *FileSource) loadFromFile() error {
|
|||
}
|
||||
}
|
||||
|
||||
return fs.update(newConfig)
|
||||
}
|
||||
|
||||
// update souce config
|
||||
// make sure only update changes configs
|
||||
func (fs *FileSource) update(configs map[string]string) error {
|
||||
// make sure config not change when fire event
|
||||
fs.updateMu.Lock()
|
||||
defer fs.updateMu.Unlock()
|
||||
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig)
|
||||
events, err := PopulateEvents(fs.GetSourceName(), fs.configs, configs)
|
||||
if err != nil {
|
||||
fs.Unlock()
|
||||
log.Warn("generating event error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
fs.configs = newConfig
|
||||
fs.configs = configs
|
||||
fs.Unlock()
|
||||
|
||||
fs.configRefresher.fireEvents(events...)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -80,12 +80,7 @@ func (r *refresher) refreshPeriodically(name string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *refresher) fireEvents(name string, source, target map[string]string) error {
|
||||
events, err := PopulateEvents(name, source, target)
|
||||
if err != nil {
|
||||
log.Warn("generating event error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
func (r *refresher) fireEvents(events ...*Event) {
|
||||
// Generate OnEvent Callback based on the events created
|
||||
ptr := r.eh.Load()
|
||||
if ptr != nil && *ptr != nil {
|
||||
|
@ -93,7 +88,6 @@ func (r *refresher) fireEvents(name string, source, target map[string]string) er
|
|||
(*ptr).OnEvent(e)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *refresher) SetEventHandler(eh EventHandler) {
|
||||
|
|
Loading…
Reference in New Issue