Refactor showConfigurations to allow return global config rather than only return config of this component (#21063)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/21042/head
aoiasd 2022-12-09 14:31:21 +08:00 committed by GitHub
parent 908023a06e
commit de0ab9e2cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 170 additions and 208 deletions

View File

@ -29,6 +29,54 @@ const (
TombValue = "TOMB_VAULE"
)
type Filter func(key string) (string, bool)
func WithSubstr(substring string) Filter {
substring = strings.ToLower(substring)
return func(key string) (string, bool) {
return key, strings.Contains(key, substring)
}
}
func WithPrefix(prefix string) Filter {
prefix = strings.ToLower(prefix)
return func(key string) (string, bool) {
return key, strings.HasPrefix(key, prefix)
}
}
func WithOneOfPrefixs(prefixs ...string) Filter {
for id, prefix := range prefixs {
prefixs[id] = strings.ToLower(prefix)
}
return func(key string) (string, bool) {
for _, prefix := range prefixs {
if strings.HasPrefix(key, prefix) {
return key, true
}
}
return key, false
}
}
func RemovePrefix(prefix string) Filter {
prefix = strings.ToLower(prefix)
return func(key string) (string, bool) {
return strings.Replace(key, prefix, "", 1), true
}
}
func filterate(key string, filters ...Filter) (string, bool) {
var ok bool
for _, filter := range filters {
key, ok = filter(key)
if !ok {
return key, ok
}
}
return key, ok
}
type Manager struct {
sync.RWMutex
Dispatcher *EventDispatcher
@ -64,34 +112,23 @@ func (m *Manager) GetConfig(key string) (string, error) {
return m.getConfigValueBySource(realKey, sourceName)
}
//GetConfigsByPattern returns key values that matched pattern
// withPrefix : whether key include the prefix of pattern
func (m *Manager) GetConfigsByPattern(pattern string, withPrefix bool) map[string]string {
func (m *Manager) GetBy(filters ...Filter) map[string]string {
m.RLock()
defer m.RUnlock()
matchedConfig := make(map[string]string)
pattern = strings.ToLower(pattern)
for key, value := range m.keySourceMap {
result := strings.HasPrefix(key, pattern)
if result {
for key, value := range m.keySourceMap {
newkey, ok := filterate(key, filters...)
if ok {
sValue, err := m.getConfigValueBySource(key, value)
if err != nil {
log.Error("Get some invalid config", zap.String("key", key))
continue
}
checkAndCutOffKey := func() string {
if withPrefix {
return key
}
return strings.Replace(key, pattern, "", 1)
}
finalKey := checkAndCutOffKey()
matchedConfig[finalKey] = sValue
matchedConfig[newkey] = sValue
}
}
return matchedConfig
}

View File

@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -40,28 +39,6 @@ func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics {
}
}
//getComponentConfigurations returns the configurations of dataNode matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "datacoord."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
// getSystemInfoMetrics composes data cluster metrics
func (s *Server) getSystemInfoMetrics(
ctx context.Context,

View File

@ -1009,7 +1009,7 @@ func TestServer_watchQueryCoord(t *testing.T) {
func TestServer_ShowConfigurations(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
pattern := "Port"
pattern := "datacoord.Port"
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,

View File

@ -830,7 +830,22 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
}, nil
}
return getComponentConfigurations(ctx, req), nil
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "datacoord", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
// GetMetrics returns DataCoord metrics info

View File

@ -736,8 +736,22 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
Configuations: nil,
}, nil
}
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "datanode", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return getComponentConfigurations(ctx, req), nil
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
// GetMetrics return datanode metrics

View File

@ -387,7 +387,7 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test ShowConfigurations", func(t *testing.T) {
pattern := "Port"
pattern := "datanode.Port"
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,

View File

@ -21,7 +21,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -61,28 +60,6 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
}, nil
}
//getComponentConfigurations returns the configurations of dataNode matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "datanode."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
// TODO(dragondriver): add more metrics
usedMem := hardware.GetUsedMemoryCount()

View File

@ -946,7 +946,22 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
}, nil
}
return getComponentConfigurations(ctx, req), nil
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "indexcoord", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
// GetMetrics gets the metrics info of IndexCoord.

View File

@ -479,7 +479,7 @@ func testIndexCoord(t *testing.T) {
})
t.Run("Showconfigurations, port", func(t *testing.T) {
pattern := "Port"
pattern := "indexcoord.Port"
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,

View File

@ -24,7 +24,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -32,28 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
)
//getComponentConfigurations returns the configurations of indexCoord matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "indexcoord."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
// TODO(dragondriver): add more detailed metrics
func getSystemInfoMetrics(
ctx context.Context,

View File

@ -325,7 +325,22 @@ func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.Show
}, nil
}
return getComponentConfigurations(ctx, req), nil
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "indexnode", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
func (i *IndexNode) SetAddress(address string) {

View File

@ -21,35 +21,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
//getComponentConfigurations returns the configurations of queryNode matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "indexnode."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
// TODO(dragondriver): maybe IndexNode should be an interface so that we can mock it in the test cases
func getSystemInfoMetrics(
ctx context.Context,

View File

@ -507,11 +507,8 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy),
}, nil
}
prefix := "querycoord."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "querycoord", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,

View File

@ -753,7 +753,7 @@ func (suite *ServiceSuite) TestShowConfigurations() {
server := suite.server
req := &internalpb.ShowConfigurationsRequest{
Pattern: "Port",
Pattern: "querycoord.Port",
}
resp, err := server.ShowConfigurations(ctx, req)
suite.NoError(err)
@ -764,7 +764,7 @@ func (suite *ServiceSuite) TestShowConfigurations() {
// Test when server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
req = &internalpb.ShowConfigurationsRequest{
Pattern: "Port",
Pattern: "querycoord.Port",
}
resp, err = server.ShowConfigurations(ctx, req)
suite.NoError(err)

View File

@ -1216,7 +1216,22 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
node.wg.Add(1)
defer node.wg.Done()
return getComponentConfigurations(ctx, req), nil
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "querynode", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...

View File

@ -21,7 +21,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -29,28 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
//getComponentConfigurations returns the configurations of queryNode matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "querynode."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
var err error

View File

@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -65,32 +64,3 @@ func TestGetSystemInfoMetrics(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
rateCol.Register(metricsinfo.NQPerSecond)
}
func TestGetComponentConfigurationsFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
require.NoError(t, err)
defer node.Stop()
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.NoError(t, err)
defer etcdCli.Close()
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)
req := &internalpb.ShowConfigurationsRequest{
Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels, node.session.ServerID),
Pattern: "Cache",
}
resq := getComponentConfigurations(ctx, req)
assert.Equal(t, resq.Status.ErrorCode, commonpb.ErrorCode_Success)
}

View File

@ -24,35 +24,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
//getComponentConfigurations returns the configurations of rootcoord matching req.Pattern
func getComponentConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse {
prefix := "rootcoord."
matchedConfig := Params.GetByPattern(prefix + req.Pattern)
configList := make([]*commonpb.KeyValuePair, 0, len(matchedConfig))
for key, value := range matchedConfig {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}
}
func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
rootCoordTopology := metricsinfo.RootCoordTopology{
Self: metricsinfo.RootCoordInfos{

View File

@ -1412,7 +1412,22 @@ func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfi
}, nil
}
return getComponentConfigurations(ctx, req), nil
configList := make([]*commonpb.KeyValuePair, 0)
for key, value := range Params.GetComponentConfigurations(ctx, "rootcoord", req.Pattern) {
configList = append(configList,
&commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Configuations: configList,
}, nil
}
// GetMetrics get metrics

View File

@ -693,7 +693,7 @@ func TestRootCoord_ShowConfigurations(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
Params.InitOnce()
pattern := "Port"
pattern := "rootcoord.Port"
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),

View File

@ -12,6 +12,7 @@
package paramtable
import (
"context"
"fmt"
"os"
"path"
@ -55,6 +56,11 @@ const (
DefaultMaxBackups = 20
)
//Const of Global Config List
func globalConfigPrefixs() []string {
return []string{"metastore.", "localStorage.", "etcd.", "mysql.", "minio.", "pulsar.", "kafka.", "rocksmq.", "log.", "grpc.", "common.", "quotaAndLimits."}
}
var defaultYaml = DefaultMilvusYaml
// Base abstracts BaseTable
@ -247,12 +253,13 @@ func (gp *BaseTable) Get(key string) string {
return value
}
func (gp *BaseTable) GetByPattern(pattern string) map[string]string {
return gp.mgr.GetConfigsByPattern(pattern, true)
func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string {
return gp.mgr.GetBy(config.WithPrefix(pattern), config.RemovePrefix(pattern))
}
func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string {
return gp.mgr.GetConfigsByPattern(pattern, false)
func (gp *BaseTable) GetComponentConfigurations(ctx context.Context, componentName string, sub string) map[string]string {
allownPrefixs := append(globalConfigPrefixs(), componentName+".")
return gp.mgr.GetBy(config.WithSubstr(sub), config.WithOneOfPrefixs(allownPrefixs...))
}
func (gp *BaseTable) GetAll() map[string]string {

View File

@ -131,7 +131,7 @@ func (pg *ParamGroup) GetValue() map[string]string {
if pg.GetFunc != nil {
return pg.GetFunc()
}
values := pg.manager.GetConfigsByPattern(pg.KeyPrefix, false)
values := pg.manager.GetBy(config.WithPrefix(pg.KeyPrefix), config.RemovePrefix(pg.KeyPrefix))
return values
}