Add it for refresh config (#23773)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/24002/head
Enwei Jiao 2023-05-06 17:34:39 +08:00 committed by GitHub
parent aae67f7c5d
commit 086f3bd748
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 217 additions and 22 deletions

View File

@ -80,6 +80,8 @@ func (es EnvSource) GetSourceName() string {
func (es EnvSource) SetEventHandler(eh EventHandler) { func (es EnvSource) SetEventHandler(eh EventHandler) {
}
func (es EnvSource) UpdateOptions(opts Options) {
} }
func (es EnvSource) Close() { func (es EnvSource) Close() {

View File

@ -112,8 +112,25 @@ func (es *EtcdSource) SetEventHandler(eh EventHandler) {
es.configRefresher.eh = eh es.configRefresher.eh = eh
} }
func (es *EtcdSource) UpdateOptions(opts Options) {
if opts.EtcdInfo == nil {
return
}
es.Lock()
defer es.Unlock()
es.keyPrefix = opts.EtcdInfo.KeyPrefix
if es.configRefresher.refreshInterval != opts.EtcdInfo.RefreshInterval {
es.configRefresher.stop()
es.configRefresher = newRefresher(opts.EtcdInfo.RefreshInterval, es.refreshConfigurations)
es.configRefresher.start(es.GetSourceName())
}
}
func (es *EtcdSource) refreshConfigurations() error { func (es *EtcdSource) refreshConfigurations() error {
es.RLock()
prefix := es.keyPrefix + "/config" prefix := es.keyPrefix + "/config"
es.RUnlock()
ctx, cancel := context.WithTimeout(es.ctx, ReadConfigTimeout) ctx, cancel := context.WithTimeout(es.ctx, ReadConfigTimeout)
defer cancel() defer cancel()
response, err := es.etcdCli.Get(ctx, prefix, clientv3.WithPrefix()) response, err := es.etcdCli.Get(ctx, prefix, clientv3.WithPrefix())

View File

@ -94,10 +94,26 @@ func (fs *FileSource) SetEventHandler(eh EventHandler) {
fs.configRefresher.eh = eh fs.configRefresher.eh = eh
} }
func (fs *FileSource) UpdateOptions(opts Options) {
if opts.FileInfo == nil {
return
}
fs.Lock()
defer fs.Unlock()
fs.files = opts.FileInfo.Files
}
func (fs *FileSource) loadFromFile() error { func (fs *FileSource) loadFromFile() error {
yamlReader := viper.New() yamlReader := viper.New()
newConfig := make(map[string]string) newConfig := make(map[string]string)
for _, configFile := range fs.files { var configFiles []string
fs.RLock()
configFiles = fs.files
fs.RUnlock()
for _, configFile := range configFiles {
if _, err := os.Stat(configFile); err != nil { if _, err := os.Stat(configFile); err != nil {
continue continue
} }
@ -135,6 +151,7 @@ func (fs *FileSource) loadFromFile() error {
newConfig[formatKey(key)] = str newConfig[formatKey(key)] = str
} }
} }
fs.Lock() fs.Lock()
defer fs.Unlock() defer fs.Unlock()
err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig) err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig)

View File

@ -219,6 +219,20 @@ func (m *Manager) ForbidUpdate(key string) {
m.forbiddenKeys.Insert(formatKey(key)) m.forbiddenKeys.Insert(formatKey(key))
} }
func (m *Manager) UpdateSourceOptions(opts ...Option) {
m.Lock()
defer m.Unlock()
var options Options
for _, opt := range opts {
opt(&options)
}
for _, source := range m.sources {
source.UpdateOptions(options)
}
}
// Do not use it directly, only used when add source and unittests. // Do not use it directly, only used when add source and unittests.
func (m *Manager) pullSourceConfigs(source string) error { func (m *Manager) pullSourceConfigs(source string) error {
configSource, ok := m.sources[source] configSource, ok := m.sources[source]

View File

@ -75,3 +75,6 @@ func (ErrSource) GetSourceName() string {
func (e ErrSource) SetEventHandler(eh EventHandler) { func (e ErrSource) SetEventHandler(eh EventHandler) {
} }
func (e ErrSource) UpdateOptions(opt Options) {
}

View File

@ -29,6 +29,7 @@ type Source interface {
GetPriority() int GetPriority() int
GetSourceName() string GetSourceName() string
SetEventHandler(eh EventHandler) SetEventHandler(eh EventHandler)
UpdateOptions(opt Options)
Close() Close()
} }

View File

@ -86,6 +86,10 @@ func (gp *BaseTable) GlobalInitWithYaml(yaml string) {
}) })
} }
func (gp *BaseTable) UpdateSourceOpiotns(opts ...config.Option) {
gp.mgr.UpdateSourceOptions(opts...)
}
// init initializes the param table. // init initializes the param table.
// if refreshInterval greater than 0 will auto refresh config from source // if refreshInterval greater than 0 will auto refresh config from source
func (gp *BaseTable) init(refreshInterval int) { func (gp *BaseTable) init(refreshInterval int) {

View File

@ -183,7 +183,7 @@ type commonConfig struct {
DataNodeSubName ParamItem `refreshable:"false"` DataNodeSubName ParamItem `refreshable:"false"`
DefaultPartitionName ParamItem `refreshable:"false"` DefaultPartitionName ParamItem `refreshable:"false"`
DefaultIndexName ParamItem `refreshable:"false"` DefaultIndexName ParamItem `refreshable:"true"`
RetentionDuration ParamItem `refreshable:"true"` RetentionDuration ParamItem `refreshable:"true"`
EntityExpirationTTL ParamItem `refreshable:"true"` EntityExpirationTTL ParamItem `refreshable:"true"`
@ -413,7 +413,6 @@ func (p *commonConfig) init(base *BaseTable) {
Key: "common.defaultIndexName", Key: "common.defaultIndexName",
Version: "2.0.0", Version: "2.0.0",
DefaultValue: "_default_idx", DefaultValue: "_default_idx",
Forbidden: true,
Doc: "default index name", Doc: "default index name",
Export: true, Export: true,
} }

View File

@ -55,7 +55,7 @@ type ServiceParam struct {
} }
func (p *ServiceParam) init() { func (p *ServiceParam) init() {
p.BaseTable.init(10) p.BaseTable.init(2)
p.LocalStorageCfg.Init(&p.BaseTable) p.LocalStorageCfg.Init(&p.BaseTable)
p.MetaStoreCfg.Init(&p.BaseTable) p.MetaStoreCfg.Init(&p.BaseTable)

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
@ -106,7 +107,6 @@ type MiniCluster struct {
proxy types.ProxyComponent proxy types.ProxyComponent
dataCoord types.DataCoordComponent dataCoord types.DataCoordComponent
rootCoord types.RootCoordComponent rootCoord types.RootCoordComponent
//indexCoord types.IndexCoordComponent
queryCoord types.QueryCoordComponent queryCoord types.QueryCoordComponent
queryNodes []types.QueryNodeComponent queryNodes []types.QueryNodeComponent
@ -116,7 +116,7 @@ type MiniCluster struct {
metaWatcher MetaWatcher metaWatcher MetaWatcher
} }
var Params *paramtable.ComponentParam = paramtable.Get() var params *paramtable.ComponentParam = paramtable.Get()
type Option func(cluster *MiniCluster) type Option func(cluster *MiniCluster)
@ -124,16 +124,19 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
cluster = &MiniCluster{ cluster = &MiniCluster{
ctx: ctx, ctx: ctx,
} }
//Params.InitOnce() params.Init()
Params.Init()
cluster.params = DefaultParams() cluster.params = DefaultParams()
cluster.clusterConfig = DefaultClusterConfig() cluster.clusterConfig = DefaultClusterConfig()
for _, opt := range opts { for _, opt := range opts {
opt(cluster) opt(cluster)
} }
for k, v := range cluster.params { for k, v := range cluster.params {
Params.Save(k, v) params.Save(k, v)
} }
params.UpdateSourceOpiotns(config.WithEtcdSource(&config.EtcdInfo{
KeyPrefix: cluster.params[EtcdRootPath],
RefreshInterval: 2 * time.Second,
}))
if cluster.factory == nil { if cluster.factory == nil {
cluster.factory = dependency.NewDefaultFactory(true) cluster.factory = dependency.NewDefaultFactory(true)
@ -147,13 +150,13 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
if cluster.etcdCli == nil { if cluster.etcdCli == nil {
var etcdCli *clientv3.Client var etcdCli *clientv3.Client
etcdCli, err = etcd.GetEtcdClient( etcdCli, err = etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(), params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(), params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(), params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(), params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(), params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) params.EtcdCfg.EtcdTLSMinVersion.GetValue())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -448,7 +451,7 @@ func (cluster *MiniCluster) Stop() error {
} }
log.Info("mini cluster indexnodes stopped") log.Info("mini cluster indexnodes stopped")
cluster.etcdCli.KV.Delete(cluster.ctx, Params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) cluster.etcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
defer cluster.etcdCli.Close() defer cluster.etcdCli.Close()
if cluster.chunkManager == nil { if cluster.chunkManager == nil {
@ -469,9 +472,9 @@ func DefaultParams() map[string]string {
EtcdRootPath: testPath, EtcdRootPath: testPath,
MinioRootPath: testPath, MinioRootPath: testPath,
//"runtime.role": typeutil.StandaloneRole, //"runtime.role": typeutil.StandaloneRole,
Params.IntegrationTestCfg.IntegrationMode.Key: "true", params.IntegrationTestCfg.IntegrationMode.Key: "true",
Params.CommonCfg.StorageType.Key: "local", params.CommonCfg.StorageType.Key: "local",
Params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs
} }
} }

View File

@ -0,0 +1,135 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestRefreshPasswordLength(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*180)
defer cancel()
c, err := StartMiniCluster(ctx)
assert.NoError(t, err)
err = c.Start()
assert.NoError(t, err)
defer func() {
err = c.Stop()
assert.NoError(t, err)
cancel()
}()
s, err := c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
Username: "test",
Password: "1234",
})
log.Debug("first create result", zap.Any("state", s))
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, s.GetErrorCode())
params := paramtable.Get()
c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/proxy/minpasswordlength", params.EtcdCfg.RootPath.GetValue()), "3")
assert.Eventually(t, func() bool {
s, err = c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
Username: "test",
Password: "1234",
})
log.Debug("second create result", zap.Any("state", s))
return commonpb.ErrorCode_Success == s.GetErrorCode()
}, time.Second*20, time.Millisecond*500)
}
func TestRefreshDefaultIndexName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*180)
defer cancel()
c, err := StartMiniCluster(ctx)
assert.NoError(t, err)
err = c.Start()
assert.NoError(t, err)
defer func() {
err = c.Stop()
assert.NoError(t, err)
cancel()
}()
params := paramtable.Get()
c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index")
assert.Eventually(t, func() bool {
return params.CommonCfg.DefaultIndexName.GetValue() == "a_index"
}, time.Second*10, time.Millisecond*500)
dim := 128
dbName := "default"
collectionName := "test"
rowNum := 100
schema := constructSchema("test", 128, true)
marshaledSchema, err := proto.Marshal(schema)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: "default",
CollectionName: "test",
Schema: marshaledSchema,
ShardsNum: 1,
})
assert.NoError(t, err)
if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason()))
}
assert.Equal(t, createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
_, err = c.proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
assert.NoError(t, err)
_, err = c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2),
})
s, err := c.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionName,
})
assert.Equal(t, commonpb.ErrorCode_Success, s.Status.GetErrorCode())
assert.Equal(t, 1, len(s.IndexDescriptions))
assert.Equal(t, "a_index_101", s.IndexDescriptions[0].GetIndexName())
}