enhance: cache config values for saving cpu cycles to parse config item (#30947)

related: #30958

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/31197/head
Chun Han 2024-03-12 11:09:04 +08:00 committed by GitHub
parent 69e132e05b
commit 3298e64bd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 213 additions and 28 deletions

View File

@ -216,6 +216,10 @@ func (node *QueryNode) InitSegcore() error {
C.InitCpuNum(cCPUNum)
knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum()))
if knowhereBuildPoolSize < uint32(1) {
knowhereBuildPoolSize = uint32(1)
}
log.Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize))
cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize)
C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize)

View File

@ -84,16 +84,36 @@ type Manager struct {
keySourceMap *typeutil.ConcurrentMap[string, string] // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file
overlays *typeutil.ConcurrentMap[string, string] // store the highest priority configs which modified at runtime
forbiddenKeys *typeutil.ConcurrentSet[string]
configCache *typeutil.ConcurrentMap[string, interface{}]
}
func NewManager() *Manager {
return &Manager{
manager := &Manager{
Dispatcher: NewEventDispatcher(),
sources: typeutil.NewConcurrentMap[string, Source](),
keySourceMap: typeutil.NewConcurrentMap[string, string](),
overlays: typeutil.NewConcurrentMap[string, string](),
forbiddenKeys: typeutil.NewConcurrentSet[string](),
configCache: typeutil.NewConcurrentMap[string, interface{}](),
}
resetConfigCacheFunc := NewHandler("reset.config.cache", func(event *Event) {
keyToRemove := strings.NewReplacer("/", ".").Replace(event.Key)
manager.configCache.Remove(keyToRemove)
})
manager.Dispatcher.RegisterForKeyPrefix("", resetConfigCacheFunc)
return manager
}
func (m *Manager) GetCachedValue(key string) (interface{}, bool) {
return m.configCache.Get(key)
}
func (m *Manager) SetCachedValue(key string, value interface{}) {
m.configCache.Insert(key, value)
}
func (m *Manager) EvictCachedValue(key string) {
m.configCache.Remove(key)
}
func (m *Manager) GetConfig(key string) (string, error) {

View File

@ -146,32 +146,27 @@ func TestOnEvent(t *testing.T) {
KeyPrefix: "test",
RefreshInterval: 10 * time.Millisecond,
}))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
ctx := context.Background()
client.KV.Put(ctx, "test/config/a/b", "bbb")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "bbb")
client.KV.Put(ctx, "test/config/a/b", "ccc")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
client.KV.Delete(ctx, "test/config/a/b")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
@ -201,6 +196,61 @@ func TestDeadlock(t *testing.T) {
wg.Wait()
}
func TestCachedConfig(t *testing.T) {
cfg, _ := embed.ConfigFromFile("../../configs/advanced/etcd.yaml")
cfg.Dir = "/tmp/milvus/test"
e, err := embed.StartEtcd(cfg)
assert.NoError(t, err)
defer e.Close()
defer os.RemoveAll(cfg.Dir)
dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml")
mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{
Files: []string{yamlFile},
RefreshInterval: 10 * time.Millisecond,
}),
WithEtcdSource(&EtcdInfo{
Endpoints: []string{cfg.ACUrls[0].Host},
KeyPrefix: "test",
RefreshInterval: 10 * time.Millisecond,
}))
// test get cached value from file
{
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
_, exist := mgr.GetCachedValue("a.b")
assert.False(t, exist)
mgr.SetCachedValue("a.b", "aaa")
val, exist := mgr.GetCachedValue("a.b")
assert.True(t, exist)
assert.Equal(t, "aaa", val.(string))
// after refresh, the cached value should be reset
os.WriteFile(yamlFile, []byte("a.b: xxx"), 0o600)
time.Sleep(time.Second)
_, exist = mgr.GetCachedValue("a.b")
assert.False(t, exist)
}
client := v3client.New(e.Server)
{
_, exist := mgr.GetCachedValue("c.d")
assert.False(t, exist)
mgr.SetCachedValue("cd", "xxx")
val, exist := mgr.GetCachedValue("cd")
assert.True(t, exist)
assert.Equal(t, "xxx", val.(string))
// after refresh, the cached value should be reset
ctx := context.Background()
client.KV.Put(ctx, "test/config/c/d", "www")
time.Sleep(time.Second)
_, exist = mgr.GetCachedValue("cd")
assert.False(t, exist)
}
}
type ErrSource struct{}
func (e ErrSource) Close() {

View File

@ -253,12 +253,14 @@ func (bt *BaseTable) GetWithDefault(key, defaultValue string) string {
// Remove Config by key
func (bt *BaseTable) Remove(key string) error {
bt.mgr.DeleteConfig(key)
bt.mgr.EvictCachedValue(key)
return nil
}
// Update Config
func (bt *BaseTable) Save(key, value string) error {
bt.mgr.SetConfig(key, value)
bt.mgr.EvictCachedValue(key)
return nil
}
@ -272,5 +274,6 @@ func (bt *BaseTable) SaveGroup(group map[string]string) error {
// Reset Config to default value
func (bt *BaseTable) Reset(key string) error {
bt.mgr.ResetConfig(key)
bt.mgr.EvictCachedValue(key)
return nil
}

View File

@ -1952,7 +1952,7 @@ type queryNodeConfig struct {
InterimIndexNlist ParamItem `refreshable:"false"`
InterimIndexNProbe ParamItem `refreshable:"false"`
InterimIndexMemExpandRate ParamItem `refreshable:"false"`
InterimIndexBuildParallelRate ParamItem `refreshable:"true"`
InterimIndexBuildParallelRate ParamItem `refreshable:"false"`
// memory limit
LoadMemoryUsageFactor ParamItem `refreshable:"true"`

View File

@ -284,6 +284,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt())
assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt())
params.Save(Params.BalanceCheckInterval.Key, "10000")
assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())
assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt())
@ -470,3 +471,32 @@ func TestForbiddenItem(t *testing.T) {
})
assert.Equal(t, "by-dev", params.CommonCfg.ClusterPrefix.GetValue())
}
func TestCachedParam(t *testing.T) {
Init()
params := Get()
assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool())
assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())
assert.Equal(t, int32(16), params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
assert.Equal(t, int32(16), params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())
assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())
assert.Equal(t, 0.85, params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat())
assert.Equal(t, 0.85, params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat())
assert.Equal(t, 1*time.Hour, params.DataCoordCfg.GCInterval.GetAsDuration(time.Second))
assert.Equal(t, 1*time.Hour, params.DataCoordCfg.GCInterval.GetAsDuration(time.Second))
}

View File

@ -128,23 +128,23 @@ func TestGrpcClientParams(t *testing.T) {
base.Save("grpc.client.maxMaxAttempts", "4")
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), 4)
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff)
base.Save("grpc.client.initialBackOff", "a")
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff)
base.Save("grpc.client.initialBackOff", "2.0")
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), 2.0)
assert.Equal(t, DefaultInitialBackoff, clientConfig.InitialBackoff.GetAsFloat())
base.Save(clientConfig.InitialBackoff.Key, "a")
assert.Equal(t, DefaultInitialBackoff, clientConfig.InitialBackoff.GetAsFloat())
base.Save(clientConfig.InitialBackoff.Key, "2.0")
assert.Equal(t, 2.0, clientConfig.InitialBackoff.GetAsFloat())
assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff)
base.Save("grpc.client.maxBackOff", "a")
base.Save(clientConfig.MaxBackoff.Key, "a")
assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff)
base.Save("grpc.client.maxBackOff", "50.0")
assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), 50.0)
base.Save(clientConfig.MaxBackoff.Key, "50.0")
assert.Equal(t, 50.0, clientConfig.MaxBackoff.GetAsFloat())
assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled)
base.Save("grpc.client.CompressionEnabled", "a")
assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled)
base.Save("grpc.client.CompressionEnabled", "true")
assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), true)
base.Save(clientConfig.CompressionEnabled.Key, "true")
assert.Equal(t, true, clientConfig.CompressionEnabled.GetAsBool())
assert.Equal(t, clientConfig.MinResetInterval.GetValue(), "1000")
base.Save("grpc.client.minResetInterval", "abc")

View File

@ -85,6 +85,7 @@ func (pi *ParamItem) SwapTempValue(s string) *string {
if s == "" {
return pi.tempValue.Swap(nil)
}
pi.manager.EvictCachedValue(pi.Key)
return pi.tempValue.Swap(&s)
}
@ -94,47 +95,124 @@ func (pi *ParamItem) GetValue() string {
}
func (pi *ParamItem) GetAsStrings() []string {
return getAsStrings(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if strings, ok := val.([]string); ok {
return strings
}
}
realStrs := getAsStrings(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, realStrs)
return realStrs
}
func (pi *ParamItem) GetAsBool() bool {
return getAsBool(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if boolVal, ok := val.(bool); ok {
return boolVal
}
}
boolVal := getAsBool(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, boolVal)
return boolVal
}
func (pi *ParamItem) GetAsInt() int {
return getAsInt(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if intVal, ok := val.(int); ok {
return intVal
}
}
intVal := getAsInt(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, intVal)
return intVal
}
func (pi *ParamItem) GetAsInt32() int32 {
return int32(getAsInt64(pi.GetValue()))
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if int32Val, ok := val.(int32); ok {
return int32Val
}
}
int32Val := int32(getAsInt64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, int32Val)
return int32Val
}
func (pi *ParamItem) GetAsUint() uint {
return uint(getAsUint64(pi.GetValue()))
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if uintVal, ok := val.(uint); ok {
return uintVal
}
}
uintVal := uint(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uintVal)
return uintVal
}
func (pi *ParamItem) GetAsUint32() uint32 {
return uint32(getAsUint64(pi.GetValue()))
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if uint32Val, ok := val.(uint32); ok {
return uint32Val
}
}
uint32Val := uint32(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uint32Val)
return uint32Val
}
func (pi *ParamItem) GetAsUint64() uint64 {
return getAsUint64(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if uint64Val, ok := val.(uint64); ok {
return uint64Val
}
}
uint64Val := getAsUint64(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, uint64Val)
return uint64Val
}
func (pi *ParamItem) GetAsUint16() uint16 {
return uint16(getAsUint64(pi.GetValue()))
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if uint16Val, ok := val.(uint16); ok {
return uint16Val
}
}
uint16Val := uint16(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uint16Val)
return uint16Val
}
func (pi *ParamItem) GetAsInt64() int64 {
return getAsInt64(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if int64Val, ok := val.(int64); ok {
return int64Val
}
}
int64Val := getAsInt64(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, int64Val)
return int64Val
}
func (pi *ParamItem) GetAsFloat() float64 {
return getAsFloat(pi.GetValue())
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if floatVal, ok := val.(float64); ok {
return floatVal
}
}
floatVal := getAsFloat(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, floatVal)
return floatVal
}
func (pi *ParamItem) GetAsDuration(unit time.Duration) time.Duration {
return getAsDuration(pi.GetValue(), unit)
if val, exist := pi.manager.GetCachedValue(pi.Key); exist {
if durationVal, ok := val.(time.Duration); ok {
return durationVal
}
}
durationVal := getAsDuration(pi.GetValue(), unit)
pi.manager.SetCachedValue(pi.Key, durationVal)
return durationVal
}
func (pi *ParamItem) GetAsJSONMap() map[string]string {