fix: remove Unnecessary lock in config manager (#29836)

issue: #29709 #291712
to avoid concurrent recursive RLock and Lock cause deadlock, This PR
remove the unnecessary lock in config manager

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/29901/head
wei liu 2024-01-11 13:48:49 +08:00 committed by GitHub
parent 9fc5f1176c
commit 1f759837c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 90 deletions

View File

@ -18,7 +18,6 @@ package config
import (
"fmt"
"strings"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -80,36 +79,33 @@ func filterate(key string, filters ...Filter) (string, bool) {
}
type Manager struct {
sync.RWMutex
Dispatcher *EventDispatcher
sources map[string]Source
keySourceMap map[string]string // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file
overlays map[string]string // store the highest priority configs which modified at runtime
forbiddenKeys typeutil.Set[string]
sources *typeutil.ConcurrentMap[string, Source]
keySourceMap *typeutil.ConcurrentMap[string, string] // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file
overlays *typeutil.ConcurrentMap[string, string] // store the highest priority configs which modified at runtime
forbiddenKeys *typeutil.ConcurrentSet[string]
}
func NewManager() *Manager {
return &Manager{
Dispatcher: NewEventDispatcher(),
sources: make(map[string]Source),
keySourceMap: make(map[string]string),
overlays: make(map[string]string),
forbiddenKeys: typeutil.NewSet[string](),
sources: typeutil.NewConcurrentMap[string, Source](),
keySourceMap: typeutil.NewConcurrentMap[string, string](),
overlays: typeutil.NewConcurrentMap[string, string](),
forbiddenKeys: typeutil.NewConcurrentSet[string](),
}
}
func (m *Manager) GetConfig(key string) (string, error) {
m.RLock()
defer m.RUnlock()
realKey := formatKey(key)
v, ok := m.overlays[realKey]
v, ok := m.overlays.Get(realKey)
if ok {
if v == TombValue {
return "", fmt.Errorf("key not found %s", key)
}
return v, nil
}
sourceName, ok := m.keySourceMap[realKey]
sourceName, ok := m.keySourceMap.Get(realKey)
if !ok {
return "", fmt.Errorf("key not found: %s", key)
}
@ -118,27 +114,27 @@ func (m *Manager) GetConfig(key string) (string, error) {
// GetConfigs returns all the key values
func (m *Manager) GetConfigs() map[string]string {
m.RLock()
defer m.RUnlock()
config := make(map[string]string)
for key := range m.keySourceMap {
m.keySourceMap.Range(func(key, value string) bool {
sValue, err := m.GetConfig(key)
if err != nil {
continue
return true
}
config[key] = sValue
}
for key, value := range m.overlays {
return true
})
m.overlays.Range(func(key, value string) bool {
config[key] = value
}
return true
})
return config
}
func (m *Manager) GetBy(filters ...Filter) map[string]string {
m.RLock()
defer m.RUnlock()
matchedConfig := make(map[string]string)
for key, value := range m.GetConfigs() {
@ -152,37 +148,33 @@ func (m *Manager) GetBy(filters ...Filter) map[string]string {
}
func (m *Manager) FileConfigs() map[string]string {
m.RLock()
defer m.RUnlock()
config := make(map[string]string)
for _, source := range m.sources {
if s, ok := source.(*FileSource); ok {
m.sources.Range(func(key string, value Source) bool {
if s, ok := value.(*FileSource); ok {
config, _ = s.GetConfigurations()
break
return false
}
}
return true
})
return config
}
func (m *Manager) Close() {
m.Lock()
defer m.Unlock()
for _, s := range m.sources {
s.Close()
}
m.sources.Range(func(key string, value Source) bool {
value.Close()
return true
})
}
func (m *Manager) AddSource(source Source) error {
m.Lock()
defer m.Unlock()
sourceName := source.GetSourceName()
_, ok := m.sources[sourceName]
_, ok := m.sources.Get(sourceName)
if ok {
err := errors.New("duplicate source supplied")
return err
}
m.sources[sourceName] = source
m.sources.Insert(sourceName, source)
err := m.pullSourceConfigs(sourceName)
if err != nil {
@ -198,55 +190,43 @@ func (m *Manager) AddSource(source Source) error {
// Update config at runtime, which can be called by others
// The most used scenario is UT
func (m *Manager) SetConfig(key, value string) {
m.Lock()
defer m.Unlock()
m.overlays[formatKey(key)] = value
m.overlays.Insert(formatKey(key), value)
}
func (m *Manager) SetMapConfig(key, value string) {
m.Lock()
defer m.Unlock()
m.overlays[strings.ToLower(key)] = value
m.overlays.Insert(strings.ToLower(key), value)
}
// Delete config at runtime, which has the highest priority to override all other sources
func (m *Manager) DeleteConfig(key string) {
m.Lock()
defer m.Unlock()
m.overlays[formatKey(key)] = TombValue
m.overlays.Insert(formatKey(key), TombValue)
}
// Remove the config which set at runtime, use config from sources
func (m *Manager) ResetConfig(key string) {
m.Lock()
defer m.Unlock()
delete(m.overlays, formatKey(key))
m.overlays.Remove(formatKey(key))
}
// Ignore any of update events, which means the config cannot auto refresh anymore
func (m *Manager) ForbidUpdate(key string) {
m.Lock()
defer m.Unlock()
m.forbiddenKeys.Insert(formatKey(key))
}
func (m *Manager) UpdateSourceOptions(opts ...Option) {
m.Lock()
defer m.Unlock()
var options Options
for _, opt := range opts {
opt(&options)
}
for _, source := range m.sources {
source.UpdateOptions(options)
}
m.sources.Range(func(key string, value Source) bool {
value.UpdateOptions(options)
return true
})
}
// Do not use it directly, only used when add source and unittests.
func (m *Manager) pullSourceConfigs(source string) error {
configSource, ok := m.sources[source]
configSource, ok := m.sources.Get(source)
if !ok {
return errors.New("invalid source or source not added")
}
@ -259,21 +239,21 @@ func (m *Manager) pullSourceConfigs(source string) error {
sourcePriority := configSource.GetPriority()
for key := range configs {
sourceName, ok := m.keySourceMap[key]
sourceName, ok := m.keySourceMap.Get(key)
if !ok { // if key do not exist then add source
m.keySourceMap[key] = source
m.keySourceMap.Insert(key, source)
continue
}
currentSource, ok := m.sources[sourceName]
currentSource, ok := m.sources.Get(sourceName)
if !ok {
m.keySourceMap[key] = source
m.keySourceMap.Insert(key, source)
continue
}
currentSrcPriority := currentSource.GetPriority()
if currentSrcPriority > sourcePriority { // lesser value has high priority
m.keySourceMap[key] = source
m.keySourceMap.Insert(key, source)
}
}
@ -281,7 +261,7 @@ func (m *Manager) pullSourceConfigs(source string) error {
}
func (m *Manager) getConfigValueBySource(configKey, sourceName string) (string, error) {
source, ok := m.sources[sourceName]
source, ok := m.sources.Get(sourceName)
if !ok {
return "", ErrKeyNotFound
}
@ -296,9 +276,9 @@ func (m *Manager) updateEvent(e *Event) error {
}
switch e.EventType {
case CreateType, UpdateType:
sourceName, ok := m.keySourceMap[e.Key]
sourceName, ok := m.keySourceMap.Get(e.Key)
if !ok {
m.keySourceMap[e.Key] = e.EventSource
m.keySourceMap.Insert(e.Key, e.EventSource)
e.EventType = CreateType
} else if sourceName == e.EventSource {
e.EventType = UpdateType
@ -310,12 +290,12 @@ func (m *Manager) updateEvent(e *Event) error {
e.EventSource, sourceName))
return ErrIgnoreChange
}
m.keySourceMap[e.Key] = e.EventSource
m.keySourceMap.Insert(e.Key, e.EventSource)
e.EventType = UpdateType
}
case DeleteType:
sourceName, ok := m.keySourceMap[e.Key]
sourceName, ok := m.keySourceMap.Get(e.Key)
if !ok || sourceName != e.EventSource {
// if delete event generated from source not maintained ignore it
log.Info(fmt.Sprintf("the event source %s (expect %s) is not maintained, ignore",
@ -325,9 +305,9 @@ func (m *Manager) updateEvent(e *Event) error {
// find less priority source or delete key
source := m.findNextBestSource(e.Key, sourceName)
if source == nil {
delete(m.keySourceMap, e.Key)
m.keySourceMap.Remove(e.Key)
} else {
m.keySourceMap[e.Key] = source.GetSourceName()
m.keySourceMap.Insert(e.Key, source.GetSourceName())
}
}
}
@ -339,8 +319,6 @@ func (m *Manager) updateEvent(e *Event) error {
// OnEvent Triggers actions when an event is generated
func (m *Manager) OnEvent(event *Event) {
m.Lock()
defer m.Unlock()
if m.forbiddenKeys.Contain(formatKey(event.Key)) {
log.Info("ignore event for forbidden key", zap.String("key", event.Key))
return
@ -358,31 +336,32 @@ func (m *Manager) GetIdentifier() string {
return "Manager"
}
func (m *Manager) findNextBestSource(key string, sourceName string) Source {
func (m *Manager) findNextBestSource(configKey string, sourceName string) Source {
var rSource Source
for _, source := range m.sources {
if source.GetSourceName() == sourceName {
continue
m.sources.Range(func(key string, value Source) bool {
if value.GetSourceName() == sourceName {
return true
}
_, err := source.GetConfigurationByKey(key)
_, err := value.GetConfigurationByKey(configKey)
if err != nil {
continue
return true
}
if rSource == nil {
rSource = source
continue
rSource = value
return true
}
if source.GetPriority() < rSource.GetPriority() { // less value has high priority
rSource = source
if value.GetPriority() < rSource.GetPriority() { // less value has high priority
rSource = value
}
}
return true
})
return rSource
}
func (m *Manager) getHighPrioritySource(srcNameA, srcNameB string) Source {
sourceA, okA := m.sources[srcNameA]
sourceB, okB := m.sources[srcNameB]
sourceA, okA := m.sources.Get(srcNameA)
sourceB, okB := m.sources.Get(srcNameB)
if !okA && !okB {
return nil

View File

@ -17,6 +17,7 @@
package config
import (
"context"
"os"
"path"
"testing"
@ -24,6 +25,9 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"golang.org/x/sync/errgroup"
)
func TestAllConfigFromManager(t *testing.T) {
@ -69,6 +73,134 @@ func TestAllDupliateSource(t *testing.T) {
assert.Error(t, err, "invalid source or source not added")
}
func TestBasic(t *testing.T) {
mgr, _ := Init()
// test set config
mgr.SetConfig("a.b", "aaa")
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
_, err = mgr.GetConfig("a.a")
assert.Error(t, err)
// test delete config
mgr.SetConfig("a.b", "aaa")
mgr.DeleteConfig("a.b")
assert.Error(t, err)
// test reset config
mgr.ResetConfig("a.b")
assert.Error(t, err)
// test forbid config
envSource := NewEnvSource(formatKey)
err = mgr.AddSource(envSource)
assert.NoError(t, err)
envSource.configs.Insert("ab", "aaa")
mgr.OnEvent(&Event{
EventSource: envSource.GetSourceName(),
EventType: CreateType,
Key: "ab",
Value: "aaa",
})
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
mgr.ForbidUpdate("a.b")
mgr.OnEvent(&Event{
EventSource: envSource.GetSourceName(),
EventType: UpdateType,
Key: "a.b",
Value: "bbb",
})
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
configs := mgr.FileConfigs()
assert.Len(t, configs, 0)
}
func TestOnEvent(t *testing.T) {
cfg, _ := embed.ConfigFromFile("../../configs/advanced/etcd.yaml")
cfg.Dir = "/tmp/milvus/test"
e, err := embed.StartEtcd(cfg)
assert.NoError(t, err)
defer e.Close()
defer os.RemoveAll(cfg.Dir)
client := v3client.New(e.Server)
dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml")
mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{
Files: []string{yamlFile},
RefreshInterval: 10 * time.Millisecond,
}),
WithEtcdSource(&EtcdInfo{
Endpoints: []string{cfg.ACUrls[0].Host},
KeyPrefix: "test",
RefreshInterval: 10 * time.Millisecond,
}))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
ctx := context.Background()
client.KV.Put(ctx, "test/config/a/b", "bbb")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "bbb")
client.KV.Put(ctx, "test/config/a/b", "ccc")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
client.KV.Delete(ctx, "test/config/a/b")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ddd")
}
func TestDeadlock(t *testing.T) {
mgr, _ := Init()
// test concurrent lock and recursive rlock
wg, _ := errgroup.WithContext(context.Background())
wg.Go(func() error {
for i := 0; i < 100; i++ {
mgr.GetBy(WithPrefix("rootcoord."))
}
return nil
})
wg.Go(func() error {
for i := 0; i < 100; i++ {
mgr.SetConfig("rootcoord.xxx", "111")
}
return nil
})
wg.Wait()
}
type ErrSource struct{}
func (e ErrSource) Close() {

View File

@ -313,13 +313,13 @@ func createParamItem(v string) paramtable.ParamItem {
item := paramtable.ParamItem{
Formatter: func(originValue string) string { return v },
}
item.Init(&config.Manager{})
item.Init(config.NewManager())
return item
}*/
func initParamItem(item *paramtable.ParamItem, v string) {
item.Formatter = func(originValue string) string { return v }
item.Init(&config.Manager{})
item.Init(config.NewManager())
}
type kafkaCfgOption func(cfg *paramtable.KafkaConfig)

View File

@ -84,7 +84,7 @@ func TestServiceParam(t *testing.T) {
// test default value
{
pc := &PulsarConfig{}
base := &BaseTable{mgr: &config.Manager{}}
base := &BaseTable{mgr: config.NewManager()}
pc.Init(base)
assert.Empty(t, pc.Address.GetValue())
}
@ -163,7 +163,7 @@ func TestServiceParam(t *testing.T) {
// test default value
{
kc := &KafkaConfig{}
base := &BaseTable{mgr: &config.Manager{}}
base := &BaseTable{mgr: config.NewManager()}
kc.Init(base)
assert.Empty(t, kc.Address.GetValue())
assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN")