mirror of https://github.com/milvus-io/milvus.git
Fix some configs not shown (#21653)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/21712/head
parent
4620aebd46
commit
90d9e165d4
|
@ -79,18 +79,18 @@ func filterate(key string, filters ...Filter) (string, bool) {
|
|||
|
||||
type Manager struct {
|
||||
sync.RWMutex
|
||||
Dispatcher *EventDispatcher
|
||||
sources map[string]Source
|
||||
keySourceMap map[string]string
|
||||
overlayConfigs map[string]string
|
||||
Dispatcher *EventDispatcher
|
||||
sources map[string]Source
|
||||
keySourceMap map[string]string // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file
|
||||
overlays map[string]string // store the highest priority configs which modified at runtime
|
||||
}
|
||||
|
||||
func NewManager() *Manager {
|
||||
return &Manager{
|
||||
Dispatcher: NewEventDispatcher(),
|
||||
sources: make(map[string]Source),
|
||||
keySourceMap: make(map[string]string),
|
||||
overlayConfigs: make(map[string]string),
|
||||
Dispatcher: NewEventDispatcher(),
|
||||
sources: make(map[string]Source),
|
||||
keySourceMap: make(map[string]string),
|
||||
overlays: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ func (m *Manager) GetConfig(key string) (string, error) {
|
|||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
realKey := formatKey(key)
|
||||
v, ok := m.overlayConfigs[realKey]
|
||||
v, ok := m.overlays[realKey]
|
||||
if ok {
|
||||
if v == TombValue {
|
||||
return "", fmt.Errorf("key not found %s", key)
|
||||
|
@ -112,26 +112,6 @@ func (m *Manager) GetConfig(key string) (string, error) {
|
|||
return m.getConfigValueBySource(realKey, sourceName)
|
||||
}
|
||||
|
||||
func (m *Manager) GetBy(filters ...Filter) map[string]string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
matchedConfig := make(map[string]string)
|
||||
|
||||
for key := range m.keySourceMap {
|
||||
newkey, ok := filterate(key, filters...)
|
||||
if ok {
|
||||
sValue, err := m.GetConfig(key)
|
||||
if err != nil {
|
||||
log.Error("Get some invalid config", zap.String("key", key))
|
||||
continue
|
||||
}
|
||||
matchedConfig[newkey] = sValue
|
||||
}
|
||||
}
|
||||
|
||||
return matchedConfig
|
||||
}
|
||||
|
||||
// GetConfigs returns all the key values
|
||||
func (m *Manager) GetConfigs() map[string]string {
|
||||
m.RLock()
|
||||
|
@ -149,6 +129,21 @@ func (m *Manager) GetConfigs() map[string]string {
|
|||
return config
|
||||
}
|
||||
|
||||
func (m *Manager) GetBy(filters ...Filter) map[string]string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
matchedConfig := make(map[string]string)
|
||||
|
||||
for key, value := range m.GetConfigs() {
|
||||
newkey, ok := filterate(key, filters...)
|
||||
if ok {
|
||||
matchedConfig[newkey] = value
|
||||
}
|
||||
}
|
||||
|
||||
return matchedConfig
|
||||
}
|
||||
|
||||
func (m *Manager) Close() {
|
||||
for _, s := range m.sources {
|
||||
s.Close()
|
||||
|
@ -178,24 +173,22 @@ func (m *Manager) AddSource(source Source) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// For compatible reason, only visiable for Test
|
||||
func (m *Manager) SetConfig(key, value string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.overlayConfigs[formatKey(key)] = value
|
||||
m.overlays[formatKey(key)] = value
|
||||
}
|
||||
|
||||
// For compatible reason, only visiable for Test
|
||||
func (m *Manager) DeleteConfig(key string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.overlayConfigs[formatKey(key)] = TombValue
|
||||
m.overlays[formatKey(key)] = TombValue
|
||||
}
|
||||
|
||||
func (m *Manager) ResetConfig(key string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
delete(m.overlayConfigs, formatKey(key))
|
||||
delete(m.overlays, formatKey(key))
|
||||
}
|
||||
|
||||
// Do not use it directly, only used when add source and unittests.
|
||||
|
|
|
@ -21,6 +21,7 @@ class MilvusConan(ConanFile):
|
|||
"zlib/1.2.13",
|
||||
"libcurl/7.87.0",
|
||||
)
|
||||
|
||||
generators = "cmake"
|
||||
default_options = {
|
||||
"rocksdb:shared": True,
|
||||
|
|
|
@ -837,7 +837,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
|
|||
}
|
||||
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "datacoord", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("datacoord", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -220,7 +220,7 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
|
|||
}, nil
|
||||
}
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "datanode", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("datanode", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -326,7 +326,7 @@ func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.Show
|
|||
}
|
||||
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "indexnode", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("indexnode", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -183,7 +183,7 @@ func (node *Proxy) Init() error {
|
|||
log.Info("init session for Proxy done")
|
||||
|
||||
node.factory.Init(Params)
|
||||
log.Debug("init parameters for factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.BaseTable.GetAll()))
|
||||
log.Debug("init parameters for factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.GetAll()))
|
||||
|
||||
accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
|
||||
log.Debug("init access log for Proxy done")
|
||||
|
|
|
@ -550,7 +550,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
|
|||
}, nil
|
||||
}
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "querycoord", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("querycoord", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -1246,7 +1246,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
|
|||
defer node.wg.Done()
|
||||
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "querynode", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("querynode", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -1421,7 +1421,7 @@ func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfi
|
|||
}
|
||||
|
||||
configList := make([]*commonpb.KeyValuePair, 0)
|
||||
for key, value := range Params.GetComponentConfigurations(ctx, "rootcoord", req.Pattern) {
|
||||
for key, value := range Params.GetComponentConfigurations("rootcoord", req.Pattern) {
|
||||
configList = append(configList,
|
||||
&commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
package paramtable
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
|
@ -168,10 +167,6 @@ func (gp *BaseTable) initConfPath() string {
|
|||
return configDir
|
||||
}
|
||||
|
||||
func (gp *BaseTable) Configs() map[string]string {
|
||||
return gp.mgr.GetConfigs()
|
||||
}
|
||||
|
||||
// Load loads an object with @key.
|
||||
func (gp *BaseTable) Load(key string) (string, error) {
|
||||
return gp.mgr.GetConfig(key)
|
||||
|
@ -190,19 +185,6 @@ func (gp *BaseTable) GetWithDefault(key, defaultValue string) string {
|
|||
return str
|
||||
}
|
||||
|
||||
func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string {
|
||||
return gp.mgr.GetBy(config.WithPrefix(pattern), config.RemovePrefix(pattern))
|
||||
}
|
||||
|
||||
func (gp *BaseTable) GetComponentConfigurations(ctx context.Context, componentName string, sub string) map[string]string {
|
||||
allownPrefixs := append(globalConfigPrefixs(), componentName+".")
|
||||
return gp.mgr.GetBy(config.WithSubstr(sub), config.WithOneOfPrefixs(allownPrefixs...))
|
||||
}
|
||||
|
||||
func (gp *BaseTable) GetAll() map[string]string {
|
||||
return gp.mgr.GetConfigs()
|
||||
}
|
||||
|
||||
// Remove Config by key
|
||||
func (gp *BaseTable) Remove(key string) error {
|
||||
gp.mgr.DeleteConfig(key)
|
||||
|
|
|
@ -28,25 +28,6 @@ func TestMain(m *testing.M) {
|
|||
os.Exit(code)
|
||||
}
|
||||
|
||||
func TestBaseTable_GetConfigSubSet(t *testing.T) {
|
||||
prefix := "rootcoord."
|
||||
configs := baseParams.mgr.GetConfigs()
|
||||
|
||||
configsWithPrefix := make(map[string]string)
|
||||
for k, v := range configs {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
configsWithPrefix[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
subSet := baseParams.GetConfigSubSet(prefix)
|
||||
|
||||
for k := range configs {
|
||||
assert.Equal(t, subSet[k], configs[prefix+k])
|
||||
}
|
||||
assert.Equal(t, len(subSet), len(configsWithPrefix))
|
||||
}
|
||||
|
||||
func TestBaseTable_DuplicateValues(t *testing.T) {
|
||||
baseParams.Save("rootcoord.dmlchannelnum", "10")
|
||||
baseParams.Save("rootcoorddmlchannelnum", "11")
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
config "github.com/milvus-io/milvus/internal/config"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
)
|
||||
|
@ -132,22 +133,19 @@ func (p *ComponentParam) Init() {
|
|||
p.IntegrationTestCfg.init(&p.BaseTable)
|
||||
}
|
||||
|
||||
func (p *ComponentParam) RocksmqEnable() bool {
|
||||
return p.RocksmqCfg.Path.GetValue() != ""
|
||||
func (p *ComponentParam) GetComponentConfigurations(componentName string, sub string) map[string]string {
|
||||
allownPrefixs := append(globalConfigPrefixs(), componentName+".")
|
||||
return p.mgr.GetBy(config.WithSubstr(sub), config.WithOneOfPrefixs(allownPrefixs...))
|
||||
}
|
||||
|
||||
func (p *ComponentParam) PulsarEnable() bool {
|
||||
return p.PulsarCfg.Address.GetValue() != ""
|
||||
}
|
||||
|
||||
func (p *ComponentParam) KafkaEnable() bool {
|
||||
return p.KafkaCfg.Address.GetValue() != ""
|
||||
func (p *ComponentParam) GetAll() map[string]string {
|
||||
return p.mgr.GetConfigs()
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
// --- common ---
|
||||
type commonConfig struct {
|
||||
ClusterPrefix ParamItem `refreshable:"true"`
|
||||
ClusterPrefix ParamItem `refreshable:"false"`
|
||||
|
||||
// Deprecated: do not use it anymore
|
||||
ProxySubName ParamItem `refreshable:"true"`
|
||||
|
|
|
@ -66,6 +66,18 @@ func (p *ServiceParam) Init() {
|
|||
p.MinioCfg.Init(&p.BaseTable)
|
||||
}
|
||||
|
||||
func (p *ServiceParam) RocksmqEnable() bool {
|
||||
return p.RocksmqCfg.Path.GetValue() != ""
|
||||
}
|
||||
|
||||
func (p *ServiceParam) PulsarEnable() bool {
|
||||
return p.PulsarCfg.Address.GetValue() != ""
|
||||
}
|
||||
|
||||
func (p *ServiceParam) KafkaEnable() bool {
|
||||
return p.KafkaCfg.Address.GetValue() != ""
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
// --- etcd ---
|
||||
type EtcdConfig struct {
|
||||
|
|
Loading…
Reference in New Issue