diff --git a/cmd/milvus/mck.go b/cmd/milvus/mck.go index 34fe326b8b..c07758f7a7 100644 --- a/cmd/milvus/mck.go +++ b/cmd/milvus/mck.go @@ -210,7 +210,7 @@ func (c *mck) formatFlags(args []string, flags *flag.FlagSet) { } func (c *mck) connectEctd() { - c.params.Init() + c.params.Init(paramtable.NewBaseTable()) var etcdCli *clientv3.Client var err error if c.etcdIP != "" { diff --git a/cmd/milvus/run.go b/cmd/milvus/run.go index 12b5efbb17..b898e2d1c1 100644 --- a/cmd/milvus/run.go +++ b/cmd/milvus/run.go @@ -14,7 +14,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" - "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -55,8 +54,8 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { // make go ignore SIGPIPE when all cgo threads set mask of SIGPIPE signal.Ignore(syscall.SIGPIPE) - var local = false role := roles.NewMilvusRoles() + role.Local = false switch c.serverType { case typeutil.RootCoordRole: role.EnableRootCoord = true @@ -83,7 +82,8 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { role.EnableDataNode = true role.EnableIndexCoord = true role.EnableIndexNode = true - local = true + role.Local = true + role.Embedded = c.serverType == typeutil.EmbeddedRole case roleMixture: role.EnableRootCoord = c.enableRootCoord role.EnableQueryCoord = c.enableQueryCoord @@ -98,12 +98,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { os.Exit(-1) } - // setup config for embedded milvus - if c.serverType == typeutil.EmbeddedRole { - var params paramtable.BaseTable - params.GlobalInitWithYaml("embedded-milvus.yaml") - } - runtimeDir := createRuntimeDir(c.serverType) filename := getPidFileName(c.serverType, c.svrAlias) @@ -114,7 +108,7 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { panic(err) } defer removePidFile(lock) - role.Run(local, c.svrAlias) + role.Run(c.svrAlias) } func (c *run) formatFlags(args []string, flags *flag.FlagSet) { diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 2f9629b8f7..550b575719 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -132,8 +132,10 @@ type MilvusRoles struct { EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"` EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` - closed chan struct{} - once sync.Once + Local bool + Embedded bool + closed chan struct{} + once sync.Once } // NewMilvusRoles creates a new MilvusRoles with private fields initialized. @@ -280,7 +282,7 @@ func (mr *MilvusRoles) handleSignals() func() { } // Run Milvus components. -func (mr *MilvusRoles) Run(local bool, alias string) { +func (mr *MilvusRoles) Run(alias string) { // start signal handler, defer close func closeFn := mr.handleSignals() defer closeFn() @@ -292,12 +294,17 @@ func (mr *MilvusRoles) Run(local bool, alias string) { mr.printLDPreLoad() // only standalone enable localMsg - if local { + if mr.Local { if err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode); err != nil { log.Error("Failed to set deploy mode: ", zap.Error(err)) } - paramtable.Init() + if mr.Embedded { + // setup config for embedded milvus + paramtable.InitWithBaseTable(paramtable.NewBaseTable(paramtable.Files([]string{"embedded-milvus.yaml"}))) + } else { + paramtable.Init() + } params := paramtable.Get() if params.EtcdCfg.UseEmbedEtcd.GetAsBool() { // Start etcd server. @@ -321,6 +328,7 @@ func (mr *MilvusRoles) Run(local bool, alias string) { var rc *components.RootCoord var wg sync.WaitGroup + local := mr.Local if mr.EnableRootCoord { rc = mr.runRootCoord(ctx, local, &wg) } diff --git a/cmd/tools/config/generate.go b/cmd/tools/config/generate.go index 65c0f256d1..a30f2a46c8 100644 --- a/cmd/tools/config/generate.go +++ b/cmd/tools/config/generate.go @@ -26,7 +26,7 @@ type DocContent struct { func collect() []DocContent { params := ¶mtable.ComponentParam{} - params.Init() + params.Init(paramtable.NewBaseTable()) val := reflect.ValueOf(params).Elem() data := make([]DocContent, 0) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 12abf7f99d..4ef5de05d7 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -622,7 +622,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() vecFieldID := int64(201) segmentInfos := &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), @@ -829,7 +829,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4" vecFieldID := int64(201) tests := []struct { @@ -971,7 +971,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() vecFieldID := int64(201) genSeg := func(segID, numRows int64) *datapb.SegmentInfo { @@ -1154,7 +1154,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() vecFieldID := int64(201) genSeg := func(segID, numRows int64) *datapb.SegmentInfo { @@ -1337,7 +1337,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() vecFieldID := int64(201) genSeg := func(segID, numRows int64) *datapb.SegmentInfo { @@ -1516,7 +1516,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.Init() + paramtable.Init() segmentInfos := &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), @@ -1688,7 +1688,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { // Test shouldDoSingleCompaction func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { - Params.Init() + paramtable.Init() trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler()) diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 3f2262db35..97847bb190 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func Test_garbageCollector_basic(t *testing.T) { @@ -243,7 +244,7 @@ func Test_garbageCollector_scan(t *testing.T) { // initialize unit test sso env func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, inserts []string, stats []string, delta []string, other []string, err error) { - Params.Init() + paramtable.Init() cli, err := minio.New(Params.MinioCfg.Address.GetValue(), &minio.Options{ Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), ""), Secure: Params.MinioCfg.UseSSL.GetAsBool(), diff --git a/internal/datacoord/index_builder_test.go b/internal/datacoord/index_builder_test.go index 68871bb934..80c6510c78 100644 --- a/internal/datacoord/index_builder_test.go +++ b/internal/datacoord/index_builder_test.go @@ -603,7 +603,7 @@ func TestIndexBuilder(t *testing.T) { nodeID = UniqueID(700) ) - Params.Init() + paramtable.Init() ctx := context.Background() catalog := catalogmocks.NewDataCoordCatalog(t) catalog.On("CreateSegmentIndex", @@ -676,7 +676,7 @@ func TestIndexBuilder(t *testing.T) { } func TestIndexBuilder_Error(t *testing.T) { - Params.Init() + paramtable.Init() sc := catalogmocks.NewDataCoordCatalog(t) sc.On("AlterSegmentIndexes", diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 3db3490764..6f6b016333 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" @@ -156,7 +157,7 @@ type MetaBasicSuite struct { } func (suite *MetaBasicSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *MetaBasicSuite) SetupTest() { @@ -216,8 +217,6 @@ func TestMeta_Basic(t *testing.T) { testSchema := newTestSchema() - Params.Init() - collInfo := &collectionInfo{ ID: collID, Schema: testSchema, diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 532e79d50d..592aa21ac6 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -35,11 +35,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestManagerOptions(t *testing.T) { // ctx := context.Background() - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -99,7 +100,7 @@ func TestManagerOptions(t *testing.T) { func TestAllocSegment(t *testing.T) { ctx := context.Background() - Params.Init() + paramtable.Init() Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") mockAllocator := newMockAllocator() meta, err := newMemoryMeta() @@ -145,7 +146,7 @@ func TestAllocSegment(t *testing.T) { func TestLastExpireReset(t *testing.T) { //set up meta on dc ctx := context.Background() - Params.Init() + paramtable.Init() Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") Params.Save(Params.DataCoordCfg.SegmentMaxSize.Key, "1") mockAllocator := newRootCoordAllocator(newMockRootCoordService()) @@ -229,7 +230,7 @@ func TestLastExpireReset(t *testing.T) { func TestAllocSegmentForImport(t *testing.T) { ctx := context.Background() - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -270,7 +271,7 @@ func TestAllocSegmentForImport(t *testing.T) { func TestLoadSegmentsFromMeta(t *testing.T) { ctx := context.Background() - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -320,7 +321,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) { } func TestSaveSegmentsToMeta(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -342,7 +343,7 @@ func TestSaveSegmentsToMeta(t *testing.T) { } func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -364,7 +365,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { } func TestDropSegment(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -387,7 +388,7 @@ func TestDropSegment(t *testing.T) { } func TestAllocRowsLargerThanOneSegment(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -409,7 +410,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) { } func TestExpireAllocation(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -452,7 +453,7 @@ func TestExpireAllocation(t *testing.T) { func TestCleanExpiredBulkloadSegment(t *testing.T) { t.Run("expiredBulkloadSegment", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -482,7 +483,7 @@ func TestCleanExpiredBulkloadSegment(t *testing.T) { func TestGetFlushableSegments(t *testing.T) { t.Run("get flushable segments between small interval", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -528,7 +529,7 @@ func TestGetFlushableSegments(t *testing.T) { func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with segment policies", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -553,7 +554,7 @@ func TestTryToSealSegment(t *testing.T) { }) t.Run("normal seal with channel seal policies", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -578,7 +579,7 @@ func TestTryToSealSegment(t *testing.T) { }) t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -605,7 +606,7 @@ func TestTryToSealSegment(t *testing.T) { }) t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) @@ -690,7 +691,7 @@ func TestTryToSealSegment(t *testing.T) { }) t.Run("seal with segment policy with kv fails", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") @@ -719,7 +720,7 @@ func TestTryToSealSegment(t *testing.T) { }) t.Run("seal with channel policy with kv fails", func(t *testing.T) { - Params.Init() + paramtable.Init() mockAllocator := newMockAllocator() memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9d1b881a33..b730c5b8e8 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3684,8 +3684,8 @@ func TestDataCoord_Import(t *testing.T) { t.Run("no datanode available", func(t *testing.T) { svr := newTestServer(t, nil) - Params.BaseTable.Save("minio.address", "minio:9000") - defer Params.BaseTable.Reset("minio.address") + Params.Save("minio.address", "minio:9000") + defer Params.Reset("minio.address") resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{ ImportTask: &datapb.ImportTask{ CollectionId: 100, diff --git a/internal/datanode/buffer_test.go b/internal/datanode/buffer_test.go index 58bba302ac..742e6ac7f6 100644 --- a/internal/datanode/buffer_test.go +++ b/internal/datanode/buffer_test.go @@ -245,7 +245,7 @@ func TestUpdateCompactedSegments(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") channel := ChannelMeta{ segments: make(map[UniqueID]*Segment), diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 30295c8495..fb006af9e4 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -66,7 +66,7 @@ func TestMain(t *testing.M) { os.Setenv("ROCKSMQ_PATH", path) defer os.RemoveAll(path) - Params.Init() + paramtable.Init() // change to specific channel for test paramtable.Get().Save(Params.EtcdCfg.Endpoints.Key, strings.Join(addrs, ",")) paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int())) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 74f88d2408..fdea3db63d 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -49,7 +49,7 @@ import ( var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service" func init() { - Params.Init() + paramtable.Init() } func getVchanInfo(info *testInfo) *datapb.VchannelInfo { diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 815e2031b2..0d237f1efc 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -255,7 +255,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") c := &nodeConfig{ channel: channel, @@ -281,7 +281,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") c := &nodeConfig{ channel: channel, @@ -313,7 +313,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") c := &nodeConfig{ channel: channel, @@ -406,7 +406,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-autoflush" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") c := &nodeConfig{ channel: channel, @@ -507,7 +507,7 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") channel := &ChannelMeta{ segments: make(map[UniqueID]*Segment), diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 7864b50abc..485d919825 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -77,7 +77,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) @@ -167,7 +167,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) @@ -330,7 +330,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) @@ -577,7 +577,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) @@ -959,7 +959,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + Params.Save("etcd.rootPath", "/test/datanode/root") Factory := &MetaFactory{} tests := []struct { diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 80db897533..358575ea57 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -207,10 +207,9 @@ func TestFlowGraphManager(t *testing.T) { fm.dropAll() const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-" - var baseParams = &Params.BaseTable - baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true)) + Params.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true)) for _, test := range tests { - baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark)) + Params.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark)) for i, memorySize := range test.memorySizes { vchannel := fmt.Sprintf("%s%d", channelPrefix, i) vchan := &datapb.VchannelInfo{ diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go index 8abf206ff5..e54eca3a11 100644 --- a/internal/datanode/segment_sync_policy_test.go +++ b/internal/datanode/segment_sync_policy_test.go @@ -85,8 +85,7 @@ func TestSyncMemoryTooHigh(t *testing.T) { for _, test := range tests { t.Run(test.testName, func(t *testing.T) { - var baseParams = &Params.BaseTable - baseParams.Save(Params.DataNodeCfg.MemoryForceSyncSegmentNum.Key, fmt.Sprintf("%d", test.syncSegmentNum)) + Params.Save(Params.DataNodeCfg.MemoryForceSyncSegmentNum.Key, fmt.Sprintf("%d", test.syncSegmentNum)) policy := syncMemoryTooHigh() segments := make([]*Segment, len(test.memorySizesInMB)) for i := range segments { diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index 408638d652..30703f814c 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -267,7 +267,7 @@ type testIndexNode struct { } func initSession(ctx context.Context) *sessionutil.Session { - baseTable := ¶mtable.Get().BaseTable + baseTable := paramtable.GetBaseTable() rootPath, err := baseTable.Load("etcd.rootPath") if err != nil { panic(err) diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index f2394ed5f4..67fe3b25e3 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -56,8 +56,6 @@ func TestMain(m *testing.M) { } func Test_NewClient(t *testing.T) { - proxy.Params.Init() - ctx := context.Background() etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), diff --git a/internal/distributed/datanode/client/client_test.go b/internal/distributed/datanode/client/client_test.go index 51ebda69b5..4b50a1f09d 100644 --- a/internal/distributed/datanode/client/client_test.go +++ b/internal/distributed/datanode/client/client_test.go @@ -22,16 +22,16 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/util/mock" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/internal/proto/datapb" "google.golang.org/grpc" - "github.com/milvus-io/milvus/internal/proxy" "github.com/stretchr/testify/assert" ) func Test_NewClient(t *testing.T) { - proxy.Params.Init() + paramtable.Init() ctx := context.Background() client, err := NewClient(ctx, "", 1) assert.Nil(t, client) diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go index 4d09c9fd37..5a7d877c71 100644 --- a/internal/distributed/indexnode/service_test.go +++ b/internal/distributed/indexnode/service_test.go @@ -44,7 +44,7 @@ func TestIndexNodeServer(t *testing.T) { assert.NotNil(t, server) inm := indexnode.NewIndexNodeMock() - ParamsGlobal.Init() + ParamsGlobal.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) etcdCli, err := etcd.GetEtcdClient( ParamsGlobal.EtcdCfg.UseEmbedEtcd.GetAsBool(), ParamsGlobal.EtcdCfg.EtcdUseSSL.GetAsBool(), diff --git a/internal/distributed/proxy/client/client_test.go b/internal/distributed/proxy/client/client_test.go index e6389d635f..bece027060 100644 --- a/internal/distributed/proxy/client/client_test.go +++ b/internal/distributed/proxy/client/client_test.go @@ -23,15 +23,15 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/util/mock" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/internal/proto/proxypb" - "github.com/milvus-io/milvus/internal/proxy" "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) func Test_NewClient(t *testing.T) { - proxy.Params.Init() + paramtable.Init() ctx := context.Background() client, err := NewClient(ctx, "", 1) diff --git a/internal/distributed/querycoord/client/client_test.go b/internal/distributed/querycoord/client/client_test.go index 4d3d48034b..ead3d65eaa 100644 --- a/internal/distributed/querycoord/client/client_test.go +++ b/internal/distributed/querycoord/client/client_test.go @@ -57,8 +57,6 @@ func TestMain(m *testing.M) { } func Test_NewClient(t *testing.T) { - proxy.Params.Init() - ctx := context.Background() etcdCli, err := etcd.GetEtcdClient( diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index b1dab0072f..3d339a4356 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -57,8 +57,6 @@ func TestMain(m *testing.M) { } func Test_NewClient(t *testing.T) { - proxy.Params.Init() - ctx := context.Background() etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index b569f017ae..78c0d187ad 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -180,8 +180,7 @@ func TestRun(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() rootPath := fmt.Sprintf("/%d/test", randVal) - rootcoord.Params.BaseTable.Save("etcd.rootPath", rootPath) - rootcoord.Params.Init() + rootcoord.Params.Save("etcd.rootPath", rootPath) etcdCli, err := etcd.GetEtcdClient( etcdConfig.UseEmbedEtcd.GetAsBool(), diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index f3fcdd77a0..64e3677231 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -145,7 +145,7 @@ func (i *IndexNode) Register() error { } func (i *IndexNode) initSegcore() { - cGlogConf := C.CString(path.Join(Params.BaseTable.GetConfigDir(), paramtable.DefaultGlogConf)) + cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) C.IndexBuilderInit(cGlogConf) C.free(unsafe.Pointer(cGlogConf)) diff --git a/internal/indexnode/indexnode_component_mock.go b/internal/indexnode/indexnode_component_mock.go index a5e8a45054..d64ea843d3 100644 --- a/internal/indexnode/indexnode_component_mock.go +++ b/internal/indexnode/indexnode_component_mock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type mockIndexNodeComponent struct { @@ -13,7 +14,7 @@ type mockIndexNodeComponent struct { var _ types.IndexNodeComponent = &mockIndexNodeComponent{} func NewMockIndexNodeComponent(ctx context.Context) (types.IndexNodeComponent, error) { - Params.Init() + paramtable.Init() factory := &mockFactory{ chunkMgr: &mockChunkmgr{}, } diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index f068f97a63..a10c3eb587 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/stretchr/testify/assert" ) @@ -460,7 +461,7 @@ func TestComponentState(t *testing.T) { } ctx = context.TODO() ) - Params.Init() + paramtable.Init() in := NewIndexNode(ctx, factory) in.SetEtcdClient(getEtcdClient()) state, err := in.GetComponentStates(ctx) @@ -495,7 +496,7 @@ func TestGetTimeTickChannel(t *testing.T) { } ctx = context.TODO() ) - Params.Init() + paramtable.Init() in := NewIndexNode(ctx, factory) ret, err := in.GetTimeTickChannel(ctx) assert.NoError(t, err) @@ -509,7 +510,7 @@ func TestGetStatisticChannel(t *testing.T) { } ctx = context.TODO() ) - Params.Init() + paramtable.Init() in := NewIndexNode(ctx, factory) ret, err := in.GetStatisticsChannel(ctx) @@ -524,7 +525,7 @@ func TestIndexTaskWhenStoppingNode(t *testing.T) { } ctx = context.TODO() ) - Params.Init() + paramtable.Init() in := NewIndexNode(ctx, factory) in.loadOrStoreTask("cluster-1", 1, &taskInfo{ @@ -558,7 +559,7 @@ func TestGetSetAddress(t *testing.T) { } ctx = context.TODO() ) - Params.Init() + paramtable.Init() in := NewIndexNode(ctx, factory) in.SetAddress("address") assert.Equal(t, "address", in.GetAddress()) diff --git a/internal/indexnode/task_scheduler_test.go b/internal/indexnode/task_scheduler_test.go index c7bd82e2da..46b6b9ac52 100644 --- a/internal/indexnode/task_scheduler_test.go +++ b/internal/indexnode/task_scheduler_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/stretchr/testify/assert" ) @@ -155,7 +156,7 @@ func newTask(cancelStage fakeTaskState, reterror map[fakeTaskState]error, expect } func TestIndexTaskScheduler(t *testing.T) { - Params.Init() + paramtable.Init() scheduler := NewTaskScheduler(context.TODO()) scheduler.Start() diff --git a/internal/kv/etcd/embed_etcd_config_test.go b/internal/kv/etcd/embed_etcd_config_test.go index bab2f0f225..f75aff241f 100644 --- a/internal/kv/etcd/embed_etcd_config_test.go +++ b/internal/kv/etcd/embed_etcd_config_test.go @@ -36,7 +36,7 @@ func TestEtcdConfigLoad(te *testing.T) { te.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml") te.Setenv("etcd.data.dir", "etcd.test.data.dir") - param.Init() + param.Init(paramtable.NewBaseTable()) //clean up data defer func() { os.RemoveAll("etcd.test.data.dir") diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 752f660621..314762f03f 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -40,7 +40,7 @@ func TestEmbedEtcd(te *testing.T) { dir := te.TempDir() te.Setenv("etcd.data.dir", dir) - param.Init() + param.Init(paramtable.NewBaseTable()) te.Run("etcdKV SaveAndLoad", func(t *testing.T) { rootPath := "/etcd/test/root/saveandload" diff --git a/internal/kv/etcd/embed_etcd_restart_test.go b/internal/kv/etcd/embed_etcd_restart_test.go index c897e21568..85d918d8e5 100644 --- a/internal/kv/etcd/embed_etcd_restart_test.go +++ b/internal/kv/etcd/embed_etcd_restart_test.go @@ -33,9 +33,9 @@ func TestEtcdRestartLoad(te *testing.T) { te.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) te.Setenv("ETCD_USE_EMBED", "true") param := new(paramtable.ComponentParam) - param.Init() - param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") - param.BaseTable.Save("etcd.data.dir", etcdDataDir) + param.Init(paramtable.NewBaseTable()) + param.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") + param.Save("etcd.data.dir", etcdDataDir) //clean up data defer func() { err := os.RemoveAll(etcdDataDir) diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 5f1c8d4b77..2548c0bf45 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -34,10 +34,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -var Params paramtable.ComponentParam +var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() code := m.Run() os.Exit(code) } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 5d18673721..856706de1f 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -1064,7 +1064,7 @@ func TestCatalog_DropSegmentIndex(t *testing.T) { } func BenchmarkCatalog_List1000Segments(b *testing.B) { - Params.Init() + paramtable.Init() etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), @@ -1170,7 +1170,7 @@ func addSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() code := m.Run() os.Exit(code) } diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 5a92d605b2..862350ca1b 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type CatalogTestSuite struct { @@ -21,7 +22,7 @@ type CatalogTestSuite struct { } func (suite *CatalogTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *CatalogTestSuite) SetupTest() { diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index 7d876313c6..0c3d4538b1 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -44,7 +44,7 @@ var ( var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() code := m.Run() os.Exit(code) } @@ -279,7 +279,6 @@ func Test_SuffixSnapshotLoad(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" @@ -336,7 +335,6 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" etcdCli, err := etcd.GetEtcdClient( @@ -416,7 +414,6 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() rootPath := fmt.Sprintf("/test/meta/remove-expired-test-%d", randVal) sep := "_ts" @@ -592,7 +589,6 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" @@ -685,7 +681,6 @@ func TestSuffixSnapshot_LoadWithPrefix(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() rootPath := fmt.Sprintf("/test/meta/loadWithPrefix-test-%d", randVal) sep := "_ts" diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 623713fec3..b089857d10 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -1452,7 +1452,7 @@ func TestRocksmq_Info(t *testing.T) { func TestRocksmq_ParseCompressionTypeError(t *testing.T) { params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.CompressionTypes.Key, "invalid,1") _, err := parseCompressionType(params) assert.Error(t, err) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go index 6b157113dc..af42fbee4b 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go @@ -40,7 +40,7 @@ func TestRmqRetention_Basic(t *testing.T) { defer os.RemoveAll(metaPath) params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.PageSize.Key, "10") params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "2") @@ -135,7 +135,7 @@ func TestRmqRetention_NotConsumed(t *testing.T) { defer os.RemoveAll(metaPath) params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.PageSize.Key, "10") params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "2") @@ -245,7 +245,7 @@ func TestRmqRetention_MultipleTopic(t *testing.T) { os.RemoveAll(metaPath) params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.PageSize.Key, "10") params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") @@ -412,8 +412,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) { defer os.RemoveAll(metaPath) - params := paramtable.Get() - params.Init() + paramtable.Init() rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.NoError(t, err) assert.NotNil(t, rmq) @@ -468,7 +467,7 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { os.RemoveAll(metaPath) params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.PageSize.Key, "10") params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") @@ -594,7 +593,7 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { os.RemoveAll(metaPath) params := paramtable.Get() - params.Init() + paramtable.Init() params.Save(params.RocksmqCfg.PageSize.Key, "10") params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") diff --git a/internal/proxy/accesslog/access_log_test.go b/internal/proxy/accesslog/access_log_test.go index 328de7869c..755b9f20c5 100644 --- a/internal/proxy/accesslog/access_log_test.go +++ b/internal/proxy/accesslog/access_log_test.go @@ -35,7 +35,7 @@ import ( func TestAccessLogger_NotEnable(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) Params.Save(Params.ProxyCfg.AccessLog.Enable.Key, "false") InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg) @@ -66,7 +66,7 @@ func TestAccessLogger_NotEnable(t *testing.T) { func TestAccessLogger_Basic(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) defer os.RemoveAll(testPath) @@ -99,7 +99,7 @@ func TestAccessLogger_Basic(t *testing.T) { func TestAccessLogger_Stdout(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) Params.Save(Params.ProxyCfg.AccessLog.Filename.Key, "") InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg) @@ -129,7 +129,7 @@ func TestAccessLogger_Stdout(t *testing.T) { func TestAccessLogger_WithMinio(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") @@ -173,7 +173,7 @@ func TestAccessLogger_WithMinio(t *testing.T) { func TestAccessLogger_Error(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) defer os.RemoveAll(testPath) diff --git a/internal/proxy/accesslog/log_writer_test.go b/internal/proxy/accesslog/log_writer_test.go index c45e90b8cf..e5476c897f 100644 --- a/internal/proxy/accesslog/log_writer_test.go +++ b/internal/proxy/accesslog/log_writer_test.go @@ -38,7 +38,7 @@ func getText(size int) []byte { } func TestRotateLogger_Basic(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") @@ -67,7 +67,7 @@ func TestRotateLogger_Basic(t *testing.T) { func TestRotateLogger_TimeRotate(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") @@ -95,7 +95,7 @@ func TestRotateLogger_TimeRotate(t *testing.T) { func TestRotateLogger_SizeRotate(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") @@ -129,7 +129,7 @@ func TestRotateLogger_SizeRotate(t *testing.T) { func TestRotateLogger_LocalRetention(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/accesstest" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) Params.Save(Params.ProxyCfg.AccessLog.MaxBackups.Key, "1") @@ -150,7 +150,7 @@ func TestRotateLogger_LocalRetention(t *testing.T) { func TestRotateLogger_BasicError(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "" Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath) @@ -174,7 +174,7 @@ func TestRotateLogger_BasicError(t *testing.T) { func TestRotateLogger_InitError(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "" params.Save(params.ProxyCfg.AccessLog.LocalPath.Key, testPath) params.Save(params.ProxyCfg.AccessLog.MinioEnable.Key, "true") diff --git a/internal/proxy/accesslog/minio_handler_test.go b/internal/proxy/accesslog/minio_handler_test.go index cb947df33f..a298f506c8 100644 --- a/internal/proxy/accesslog/minio_handler_test.go +++ b/internal/proxy/accesslog/minio_handler_test.go @@ -29,7 +29,7 @@ import ( func TestMinioHandler_ConnectError(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.MinioCfg.UseIAM.Key, "true") params.Save(params.MinioCfg.Address.Key, "") @@ -49,7 +49,7 @@ func TestMinioHandler_Join(t *testing.T) { func TestMinHandler_Basic(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/miniotest" Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") Params.Save(Params.ProxyCfg.AccessLog.RemotePath.Key, "accesslog") @@ -95,7 +95,7 @@ func TestMinHandler_Basic(t *testing.T) { func TestMinioHandler_WithTimeRetention(t *testing.T) { var Params paramtable.ComponentParam - Params.Init() + Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) testPath := "/tmp/miniotest" Params.Save(Params.ProxyCfg.AccessLog.MinioEnable.Key, "true") Params.Save(Params.ProxyCfg.AccessLog.RemotePath.Key, "accesslog") diff --git a/internal/proxy/hook_interceptor.go b/internal/proxy/hook_interceptor.go index c2f283593a..a86e640cf6 100644 --- a/internal/proxy/hook_interceptor.go +++ b/internal/proxy/hook_interceptor.go @@ -63,13 +63,13 @@ func initHook() error { if !ok { return fmt.Errorf("fail to convert the `Hook` interface") } - if err = hoo.Init(Params.HookCfg.SoConfig.GetValue()); err != nil { + if err = hoo.Init(paramtable.GetHookParams().SoConfig.GetValue()); err != nil { return fmt.Errorf("fail to init configs for the hook, error: %s", err.Error()) } - Params.HookCfg.WatchHookWithPrefix("watch_hook", "", func(event *config.Event) { + paramtable.GetHookParams().WatchHookWithPrefix("watch_hook", "", func(event *config.Event) { log.Info("receive the hook refresh event", zap.Any("event", event)) go func() { - soConfig := Params.HookCfg.SoConfig.GetValue() + soConfig := paramtable.GetHookParams().SoConfig.GetValue() log.Info("refresh hook configs", zap.Any("config", soConfig)) if err = hoo.Init(soConfig); err != nil { log.Panic("fail to init configs for the hook when refreshing", zap.Error(err)) diff --git a/internal/proxy/lb_policy_test.go b/internal/proxy/lb_policy_test.go index 58b156dff9..956969cc49 100644 --- a/internal/proxy/lb_policy_test.go +++ b/internal/proxy/lb_policy_test.go @@ -58,7 +58,7 @@ type LBPolicySuite struct { } func (s *LBPolicySuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (s *LBPolicySuite) SetupTest() { diff --git a/internal/proxy/proxy_rpc_test.go b/internal/proxy/proxy_rpc_test.go index 993a36eed6..2566a35e00 100644 --- a/internal/proxy/proxy_rpc_test.go +++ b/internal/proxy/proxy_rpc_test.go @@ -32,11 +32,12 @@ func TestProxyRpcLimit(t *testing.T) { localMsg := true factory := dependency.NewDefaultFactory(localMsg) + bt := paramtable.NewBaseTable(paramtable.SkipRemote(true)) base := ¶mtable.ComponentParam{} - base.Init() + base.Init(bt) var p paramtable.GrpcServerConfig assert.NoError(t, err) - p.Init(typeutil.ProxyRole, &base.BaseTable) + p.Init(typeutil.ProxyRole, bt) base.Save("proxy.grpc.serverMaxRecvSize", "1") assert.Equal(t, p.ServerMaxRecvSize.GetAsInt(), 1) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index b129dc2b39..895162350d 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -439,10 +439,11 @@ func TestProxy(t *testing.T) { testServer := newProxyTestServer(proxy) wg.Add(1) + bt := paramtable.NewBaseTable(paramtable.SkipRemote(true)) base := ¶mtable.ComponentParam{} - base.Init() + base.Init(bt) var p paramtable.GrpcServerConfig - p.Init(typeutil.ProxyRole, &base.BaseTable) + p.Init(typeutil.ProxyRole, bt) testServer.Proxy.SetAddress(p.GetAddress()) assert.Equal(t, p.GetAddress(), testServer.Proxy.GetAddress()) diff --git a/internal/proxy/task_index_test.go b/internal/proxy/task_index_test.go index d174f66f6c..8d7aa736e0 100644 --- a/internal/proxy/task_index_test.go +++ b/internal/proxy/task_index_test.go @@ -117,7 +117,7 @@ func TestDropIndexTask_PreExecute(t *testing.T) { fieldName := "field1" indexName := "_default_idx_101" - Params.Init() + paramtable.Init() qc := getMockQueryCoord() qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ Status: &commonpb.Status{ @@ -607,7 +607,7 @@ func Test_wrapUserIndexParams(t *testing.T) { } func Test_parseIndexParams_AutoIndex(t *testing.T) { - Params.Init() + paramtable.Init() mgr := config.NewManager() mgr.SetConfig("autoIndex.enable", "false") mgr.SetConfig("autoIndex.params.build", `{"M": 30,"efConstruction": 360,"index_type": "HNSW", "metric_type": "IP"}`) diff --git a/internal/proxy/task_statistic_test.go b/internal/proxy/task_statistic_test.go index 78397f4cf3..f50d8c4b14 100644 --- a/internal/proxy/task_statistic_test.go +++ b/internal/proxy/task_statistic_test.go @@ -49,7 +49,7 @@ type StatisticTaskSuite struct { } func (s *StatisticTaskSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (s *StatisticTaskSuite) SetupTest() { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index b3fd639688..207ee8153d 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type RowCountBasedBalancerTestSuite struct { @@ -44,7 +45,7 @@ type RowCountBasedBalancerTestSuite struct { } func (suite *RowCountBasedBalancerTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *RowCountBasedBalancerTestSuite) SetupTest() { diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index e9486b8fbc..c65d5dfc2c 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type ScoreBasedBalancerTestSuite struct { @@ -43,7 +44,7 @@ type ScoreBasedBalancerTestSuite struct { } func (suite *ScoreBasedBalancerTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *ScoreBasedBalancerTestSuite) SetupTest() { diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 037c4529ba..9b0451b120 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -49,7 +49,7 @@ type BalanceCheckerTestSuite struct { } func (suite *BalanceCheckerTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *BalanceCheckerTestSuite) SetupTest() { diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 1e53fb091d..4223a0e120 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type ChannelCheckerTestSuite struct { @@ -47,7 +48,7 @@ type ChannelCheckerTestSuite struct { } func (suite *ChannelCheckerTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *ChannelCheckerTestSuite) SetupTest() { diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 8adb58a335..eca3e01047 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -52,7 +53,7 @@ type CheckerControllerSuite struct { } func (suite *CheckerControllerSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *CheckerControllerSuite) SetupTest() { diff --git a/internal/querycoordv2/checkers/index_checker_test.go b/internal/querycoordv2/checkers/index_checker_test.go index 8eb66df2d4..f17fc453f1 100644 --- a/internal/querycoordv2/checkers/index_checker_test.go +++ b/internal/querycoordv2/checkers/index_checker_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -45,7 +46,7 @@ type IndexCheckerSuite struct { } func (suite *IndexCheckerSuite) SetupSuite() { - params.Params.Init() + paramtable.Init() } func (suite *IndexCheckerSuite) SetupTest() { diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 209afa5a36..e88aacc6e3 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type SegmentCheckerTestSuite struct { @@ -48,7 +49,7 @@ type SegmentCheckerTestSuite struct { } func (suite *SegmentCheckerTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *SegmentCheckerTestSuite) SetupTest() { diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index 2214607a92..e6238ae5a1 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -50,7 +51,7 @@ type DistControllerTestSuite struct { } func (suite *DistControllerTestSuite) SetupTest() { - Params.Init() + paramtable.Init() var err error config := GenerateEtcdConfig() diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index f7930ccc9d..618747e2a0 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) const ( @@ -74,7 +75,7 @@ type JobSuite struct { } func (suite *JobSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.collections = []int64{1000, 1001} suite.partitions = map[int64][]int64{ diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 13430eb115..d74e1de25d 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -34,6 +34,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type CollectionManagerSuite struct { @@ -57,7 +58,7 @@ type CollectionManagerSuite struct { } func (suite *CollectionManagerSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.collections = []int64{100, 101, 102, 103} suite.partitions = map[int64][]int64{ diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index dd95128011..5255f59437 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type ReplicaManagerSuite struct { @@ -44,7 +45,7 @@ type ReplicaManagerSuite struct { } func (suite *ReplicaManagerSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.nodes = []int64{1, 2, 3} suite.collections = []int64{100, 101, 102} diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 908f9c783d..00a8fde88e 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -42,7 +43,7 @@ type ResourceManagerSuite struct { } func (suite *ResourceManagerSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *ResourceManagerSuite) SetupTest() { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 157d71081f..b57449c431 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -35,6 +35,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -58,7 +59,7 @@ type TargetManagerSuite struct { } func (suite *TargetManagerSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.collections = []int64{1000, 1001} suite.partitions = map[int64][]int64{ 1000: {100, 101}, diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 29a8ca926e..d213b3254b 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -74,7 +74,7 @@ type CollectionObserverSuite struct { } func (suite *CollectionObserverSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.collections = []int64{100, 101, 102, 103} suite.partitions = map[int64][]int64{ diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 5268ce8f93..ab240a41d7 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type LeaderObserverTestSuite struct { @@ -50,7 +51,7 @@ type LeaderObserverTestSuite struct { } func (suite *LeaderObserverTestSuite) SetupSuite() { - Params.Init() + paramtable.Init() } func (suite *LeaderObserverTestSuite) SetupTest() { diff --git a/internal/querycoordv2/params/params.go b/internal/querycoordv2/params/params.go index 6455e50865..e544051873 100644 --- a/internal/querycoordv2/params/params.go +++ b/internal/querycoordv2/params/params.go @@ -40,7 +40,7 @@ func GenerateEtcdConfig() *paramtable.EtcdConfig { rand.Seed(time.Now().UnixNano()) suffix := "-test-querycoord" + strconv.FormatInt(rand.Int63(), 10) - Params.BaseTable.Save("etcd.rootPath", config.MetaRootPath.GetValue()+suffix) + Params.Save("etcd.rootPath", config.MetaRootPath.GetValue()+suffix) return &Params.EtcdCfg } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 1f1e3defb6..2b97d09e2e 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -86,7 +86,7 @@ type ServerSuite struct { } func (suite *ServerSuite) SetupSuite() { - Params.Init() + paramtable.Init() params.GenerateEtcdConfig() suite.collections = []int64{1000, 1001} diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 097bcea1e1..6c1779867f 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -82,7 +83,7 @@ type TaskSuite struct { } func (suite *TaskSuite) SetupSuite() { - Params.Init() + paramtable.Init() suite.collection = 1000 suite.replica = 10 suite.subChannels = []string{ diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index febdeafee0..23b6eced87 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -31,11 +31,12 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestSpawnReplicasWithRG(t *testing.T) { - Params.Init() + paramtable.Init() config := GenerateEtcdConfig() cli, _ := etcd.GetEtcdClient( config.UseEmbedEtcd.GetAsBool(), @@ -118,7 +119,7 @@ func TestSpawnReplicasWithRG(t *testing.T) { } func TestAddNodesToCollectionsInRGFailed(t *testing.T) { - Params.Init() + paramtable.Init() store := mocks.NewQueryCoordCatalog(t) store.EXPECT().SaveCollection(mock.Anything).Return(nil) @@ -180,7 +181,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) { } func TestAddNodesToCollectionsInRG(t *testing.T) { - Params.Init() + paramtable.Init() store := mocks.NewQueryCoordCatalog(t) store.EXPECT().SaveCollection(mock.Anything).Return(nil) diff --git a/internal/querynodev2/pipeline/delete_node_test.go b/internal/querynodev2/pipeline/delete_node_test.go index 09620f6c0f..52b430ac57 100644 --- a/internal/querynodev2/pipeline/delete_node_test.go +++ b/internal/querynodev2/pipeline/delete_node_test.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -45,6 +46,7 @@ type DeleteNodeSuite struct { } func (suite *DeleteNodeSuite) SetupSuite() { + paramtable.Init() suite.collectionID = 111 suite.collectionName = "test-collection" suite.partitionIDs = []int64{11, 22} diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 0ee06adc85..97b8d49bb0 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -178,7 +178,7 @@ func (node *QueryNode) Register() error { // InitSegcore set init params of segCore, such as chunckRows, SIMD type... func (node *QueryNode) InitSegcore() error { - cGlogConf := C.CString(path.Join(paramtable.Get().BaseTable.GetConfigDir(), paramtable.DefaultGlogConf)) + cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) C.SegcoreInit(cGlogConf) C.free(unsafe.Pointer(cGlogConf)) diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 7f078f9374..f517ed5049 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -131,7 +131,6 @@ func TestDmlChannels(t *testing.T) { defer cancel() factory := dependency.NewDefaultFactory(true) - Params.Init() dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum) chanNames := dml.listChannels() diff --git a/internal/rootcoord/drop_partition_task_test.go b/internal/rootcoord/drop_partition_task_test.go index b25074b764..2552a16fba 100644 --- a/internal/rootcoord/drop_partition_task_test.go +++ b/internal/rootcoord/drop_partition_task_test.go @@ -66,8 +66,6 @@ func Test_dropPartitionTask_Prepare(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - Params.Init() - collectionName := funcutil.GenRandomStr() coll := &model.Collection{Name: collectionName} diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index aaba6dad47..0a1635797e 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -470,7 +470,6 @@ func TestMetaTable_getCollectionByIDInternal(t *testing.T) { }) t.Run("normal case, filter unavailable partitions", func(t *testing.T) { - Params.Init() meta := &MetaTable{ collID2Meta: map[typeutil.UniqueID]*model.Collection{ 100: { @@ -492,7 +491,6 @@ func TestMetaTable_getCollectionByIDInternal(t *testing.T) { }) t.Run("get latest version", func(t *testing.T) { - Params.Init() meta := &MetaTable{ collID2Meta: map[typeutil.UniqueID]*model.Collection{ 100: { diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index f1d4cdf647..f1ebe477f0 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -661,7 +661,7 @@ func withTtSynchronizer(ticker *timetickSync) Opt { } func newRocksMqTtSynchronizer() *timetickSync { - Params.Init() + paramtable.Init() paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "4") ctx := context.Background() factory := dependency.NewDefaultFactory(true) @@ -1063,7 +1063,7 @@ func newTickerWithMockNormalStream() *timetickSync { } func newTickerWithFactory(factory msgstream.Factory) *timetickSync { - Params.Init() + paramtable.Init() paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "4") ctx := context.Background() chans := map[UniqueID][]string{} diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 320cc71f26..b151f507a9 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type proxyMock struct { @@ -101,7 +102,7 @@ func (p *proxyMock) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Ref } func TestProxyClientManager_GetProxyClients(t *testing.T) { - Params.Init() + paramtable.Init() core, err := NewCore(context.Background(), nil) assert.NoError(t, err) @@ -132,7 +133,7 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) { } func TestProxyClientManager_AddProxyClient(t *testing.T) { - Params.Init() + paramtable.Init() core, err := NewCore(context.Background(), nil) assert.NoError(t, err) diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index 85e7c3b0f5..6660824964 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -29,11 +29,12 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestProxyManager(t *testing.T) { - Params.Init() + paramtable.Init() etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), @@ -109,7 +110,7 @@ func TestProxyManager(t *testing.T) { } func TestProxyManager_ErrCompacted(t *testing.T) { - Params.Init() + paramtable.Init() etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 3d06c4b2a6..b9d65ba089 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -63,7 +63,7 @@ func (d *dataCoordMockForQuota) GetMetrics(ctx context.Context, request *milvusp } func TestQuotaCenter(t *testing.T) { - Params.Init() + paramtable.Init() ctx, cancel := context.WithCancel(context.Background()) defer cancel() core, err := NewCore(ctx, nil) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 5ac6c734db..754304fd5d 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -890,7 +890,7 @@ func TestRootCoord_ShowConfigurations(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - Params.Init() + paramtable.Init() pattern := "rootcoord.Port" req := &internalpb.ShowConfigurationsRequest{ @@ -1634,8 +1634,8 @@ func TestCore_startTimeTickLoop(t *testing.T) { // make sure the main functions work well when EnableActiveStandby=true func TestRootcoord_EnableActiveStandby(t *testing.T) { randVal := rand.Int() - Params.Init() - Params.BaseTable.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) + paramtable.Init() + Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "true") paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) @@ -1682,8 +1682,8 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { // make sure the main functions work well when EnableActiveStandby=false func TestRootcoord_DisableActiveStandby(t *testing.T) { randVal := rand.Int() - Params.Init() - Params.BaseTable.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) + paramtable.Init() + Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "false") paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index 40de36978c..b48c4c8963 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -198,7 +198,7 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { } ctx := context.Background() s := newScheduler(ctx, idAlloc, tsoAlloc) - Params.Init() + paramtable.Init() paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1") s.Start() @@ -233,7 +233,7 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { } ctx := context.Background() s := newScheduler(ctx, idAlloc, tsoAlloc) - Params.Init() + paramtable.Init() paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1") s.Start() diff --git a/internal/rootcoord/timestamp_bench_test.go b/internal/rootcoord/timestamp_bench_test.go index 21e723f105..92e5232ddb 100644 --- a/internal/rootcoord/timestamp_bench_test.go +++ b/internal/rootcoord/timestamp_bench_test.go @@ -30,10 +30,11 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func getTestEtcdCli() *clientv3.Client { - Params.Init() + paramtable.Init() cli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 20bb93bd09..54feec3e1f 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -21,24 +21,20 @@ import ( "io" "math/rand" "path" - "strconv" "strings" "testing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus/pkg/util/paramtable" ) // TODO: NewMinioChunkManager is deprecated. Rewrite this unittest. func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath string) (*MinioChunkManager, error) { endPoint := getMinioAddress() - accessKeyID, _ := Params.Load("minio.accessKeyID") - secretAccessKey, _ := Params.Load("minio.secretAccessKey") - useSSLStr, _ := Params.Load("minio.useSSL") - useSSL, _ := strconv.ParseBool(useSSLStr) + accessKeyID := Params.MinioCfg.AccessKeyID.GetValue() + secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue() + useSSL := Params.MinioCfg.UseSSL.GetAsBool() client, err := NewMinioChunkManager(ctx, RootPath(rootPath), Address(endPoint), @@ -57,23 +53,21 @@ func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath strin } func getMinioAddress() string { - minioHost := Params.GetWithDefault("minio.address", paramtable.DefaultMinioHost) + minioHost := Params.MinioCfg.Address.GetValue() if strings.Contains(minioHost, ":") { return minioHost } - port := Params.GetWithDefault("minio.port", paramtable.DefaultMinioPort) + port := Params.MinioCfg.Port.GetValue() return minioHost + ":" + port } func TestMinIOCMFail(t *testing.T) { ctx := context.Background() - endPoint, _ := Params.Load("9.9.9.9") - accessKeyID, _ := Params.Load("minio.accessKeyID") - secretAccessKey, _ := Params.Load("minio.secretAccessKey") - useSSLStr, _ := Params.Load("minio.useSSL") - useSSL, _ := strconv.ParseBool(useSSLStr) + accessKeyID := Params.MinioCfg.AccessKeyID.GetValue() + secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue() + useSSL := Params.MinioCfg.UseSSL.GetAsBool() client, err := NewMinioChunkManager(ctx, - Address(endPoint), + Address("9.9.9.9:invalid"), AccessKeyID(accessKeyID), SecretAccessKeyID(secretAccessKey), UseSSL(useSSL), @@ -86,12 +80,9 @@ func TestMinIOCMFail(t *testing.T) { } func TestMinIOCM(t *testing.T) { - Params.Init() - testBucket, err := Params.Load("minio.bucketName") - require.NoError(t, err) + testBucket := Params.MinioCfg.BucketName.GetValue() - configRoot, err := Params.Load("minio.rootPath") - require.NoError(t, err) + configRoot := Params.MinioCfg.RootPath.GetValue() testMinIOKVRoot := path.Join(configRoot, "milvus-minio-ut-root") diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index ece75424e5..eb875a4319 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -136,7 +136,7 @@ var Params = paramtable.Get() var localPath = "/tmp/milvus_test/chunkmanager/" func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() exitCode := m.Run() err := os.RemoveAll(localPath) if err != nil { diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index a7cc5d48ac..6b7611b52d 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -234,7 +235,7 @@ func Test_ImportWrapperRowBased(t *testing.T) { err := os.MkdirAll(TempFilesPath, os.ModePerm) assert.NoError(t, err) defer os.RemoveAll(TempFilesPath) - params.Params.Init() + paramtable.Init() // NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path // NewChunkManagerFactory() can specify the root path @@ -962,7 +963,7 @@ func Test_ImportWrapperUpdateProgressPercent(t *testing.T) { func Test_ImportWrapperFlushFunc(t *testing.T) { ctx := context.Background() - params.Params.Init() + paramtable.Init() shardID := 0 partitionID := int64(1) diff --git a/internal/util/indexcgowrapper/codec_index_test.go b/internal/util/indexcgowrapper/codec_index_test.go index 9ec002d5a4..fc8b1b05b4 100644 --- a/internal/util/indexcgowrapper/codec_index_test.go +++ b/internal/util/indexcgowrapper/codec_index_test.go @@ -2,6 +2,7 @@ package indexcgowrapper import ( "math/rand" + "os" "strconv" "testing" @@ -13,8 +14,15 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) +func TestMain(m *testing.M) { + paramtable.Init() + exitCode := m.Run() + os.Exit(exitCode) +} + type indexTestCase struct { dtype schemapb.DataType typeParams map[string]string @@ -289,17 +297,17 @@ func genIndexCase() []indexTestCase { } func genStorageConfig() *indexpb.StorageConfig { - Params.Init() + params := paramtable.Get() return &indexpb.StorageConfig{ - Address: Params.MinioCfg.Address.GetValue(), - AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), - SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), - BucketName: Params.MinioCfg.BucketName.GetValue(), - RootPath: Params.MinioCfg.RootPath.GetValue(), - IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), - UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), - UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), + Address: params.MinioCfg.Address.GetValue(), + AccessKeyID: params.MinioCfg.AccessKeyID.GetValue(), + SecretAccessKey: params.MinioCfg.SecretAccessKey.GetValue(), + BucketName: params.MinioCfg.BucketName.GetValue(), + RootPath: params.MinioCfg.RootPath.GetValue(), + IAMEndpoint: params.MinioCfg.IAMEndpoint.GetValue(), + UseSSL: params.MinioCfg.UseSSL.GetAsBool(), + UseIAM: params.MinioCfg.UseIAM.GetAsBool(), } } diff --git a/internal/util/indexcgowrapper/index_test.go b/internal/util/indexcgowrapper/index_test.go index ae2011279d..97369bbc13 100644 --- a/internal/util/indexcgowrapper/index_test.go +++ b/internal/util/indexcgowrapper/index_test.go @@ -9,7 +9,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/metric" - "github.com/milvus-io/milvus/pkg/util/paramtable" ) const ( @@ -34,8 +33,6 @@ const ( ef = 200 ) -var Params paramtable.ComponentParam - type vecTestCase struct { indexType string metricType string diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 62e8238c34..431bbb01c4 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -39,7 +39,7 @@ func TestGetServerIDConcurrently(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -82,7 +82,7 @@ func TestInit(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -110,7 +110,7 @@ func TestUpdateSessions(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) @@ -187,7 +187,7 @@ func TestSessionLivenessCheck(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -234,7 +234,7 @@ func TestWatcherHandleWatchResp(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) @@ -589,10 +589,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { // initial etcd paramtable.Init() params := paramtable.Get() - endpoints, err := params.Load("_EtcdEndpoints") - if err != nil { - panic(err) - } + endpoints := params.EtcdCfg.Endpoints.GetValue() metaRoot := fmt.Sprintf("%d/%s1", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -686,11 +683,11 @@ func TestSession_apply(t *testing.T) { func TestIntegrationMode(t *testing.T) { ctx := context.Background() + paramtable.Init() params := paramtable.Get() - params.Init() params.Save(params.IntegrationTestCfg.IntegrationMode.Key, "true") - endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.EtcdCfg.Endpoints.GetValue() metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") diff --git a/pkg/config/etcd_source.go b/pkg/config/etcd_source.go index 17ba85565c..353c731eda 100644 --- a/pkg/config/etcd_source.go +++ b/pkg/config/etcd_source.go @@ -25,7 +25,9 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" ) @@ -41,10 +43,10 @@ type EtcdSource struct { keyPrefix string configRefresher *refresher - eh EventHandler } func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) { + log.Debug("init etcd source", zap.Any("etcdInfo", etcdInfo)) etcdCli, err := etcd.GetEtcdClient( etcdInfo.UseEmbed, etcdInfo.UseSSL, @@ -122,7 +124,9 @@ func (es *EtcdSource) UpdateOptions(opts Options) { es.keyPrefix = opts.EtcdInfo.KeyPrefix if es.configRefresher.refreshInterval != opts.EtcdInfo.RefreshInterval { es.configRefresher.stop() + eh := es.configRefresher.eh es.configRefresher = newRefresher(opts.EtcdInfo.RefreshInterval, es.refreshConfigurations) + es.configRefresher.eh = eh es.configRefresher.start(es.GetSourceName()) } } @@ -134,6 +138,7 @@ func (es *EtcdSource) refreshConfigurations() error { ctx, cancel := context.WithTimeout(es.ctx, ReadConfigTimeout) defer cancel() + log.Debug("etcd refreshConfigurations", zap.String("prefix", prefix), zap.Any("endpoints", es.etcdCli.Endpoints())) response, err := es.etcdCli.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return err @@ -144,6 +149,7 @@ func (es *EtcdSource) refreshConfigurations() error { key = strings.TrimPrefix(key, prefix+"/") newConfig[key] = string(kv.Value) newConfig[formatKey(key)] = string(kv.Value) + log.Debug("got config from etcd", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) } es.Lock() defer es.Unlock() diff --git a/pkg/config/manager.go b/pkg/config/manager.go index be806de951..11cf407a40 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -285,7 +285,6 @@ func (m *Manager) getConfigValueBySource(configKey, sourceName string) (string, func (m *Manager) updateEvent(e *Event) error { // refresh all configuration one by one - log.Debug("receive update event", zap.Any("event", e)) if e.HasUpdated { return nil } @@ -328,6 +327,7 @@ func (m *Manager) updateEvent(e *Event) error { } + log.Info("receive update event", zap.Any("event", e)) e.HasUpdated = true return nil } diff --git a/pkg/config/refresher.go b/pkg/config/refresher.go index 7a7f48a248..353629cf00 100644 --- a/pkg/config/refresher.go +++ b/pkg/config/refresher.go @@ -73,7 +73,7 @@ func (r *refresher) refreshPeriodically(name string) { r.stop() } case <-r.intervalDone: - log.Info("stop refreshing configurations") + log.Info("stop refreshing configurations", zap.String("source", name)) return } } diff --git a/pkg/mq/msgdispatcher/mock_test.go b/pkg/mq/msgdispatcher/mock_test.go index f82f4eaa7a..e8035c9c0d 100644 --- a/pkg/mq/msgdispatcher/mock_test.go +++ b/pkg/mq/msgdispatcher/mock_test.go @@ -40,7 +40,7 @@ const ( var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false") exitCode := m.Run() os.Exit(exitCode) diff --git a/pkg/mq/msgstream/mq_factory_test.go b/pkg/mq/msgstream/mq_factory_test.go index 16479c3fdc..5ae1738e3a 100644 --- a/pkg/mq/msgstream/mq_factory_test.go +++ b/pkg/mq/msgstream/mq_factory_test.go @@ -24,7 +24,6 @@ import ( ) func TestPmsFactory(t *testing.T) { - Params.Init() pmsFactory := NewPmsFactory(&Params.ServiceParam) ctx := context.Background() diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 5e8356a5c7..c5b34a04ee 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -50,8 +50,8 @@ const ( var Params *paramtable.ComponentParam func TestMain(m *testing.M) { + paramtable.Init() Params = paramtable.Get() - Params.Init() mockKafkaCluster, err := kafka.NewMockCluster(1) defer mockKafkaCluster.Close() if err != nil { @@ -69,16 +69,15 @@ func TestMain(m *testing.M) { } func getPulsarAddress() string { - pulsarHost := Params.GetWithDefault("pulsar.address", "") - port := Params.GetWithDefault("pulsar.port", "") - if len(pulsarHost) != 0 && len(port) != 0 { - return "pulsar://" + pulsarHost + ":" + port + pulsarAddress := Params.PulsarCfg.Address.GetValue() + if len(pulsarAddress) != 0 { + return pulsarAddress } panic("invalid pulsar address") } func getKafkaBrokerList() string { - brokerList := Params.Get("kafka.brokerList") + brokerList := Params.KafkaCfg.Address.GetValue() log.Printf("kafka broker list: %s", brokerList) return brokerList } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 8b0c133bdc..a006e574d1 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -24,7 +24,7 @@ import ( var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() mockCluster, err := kafka.NewMockCluster(1) defer mockCluster.Close() if err != nil { @@ -41,7 +41,7 @@ func TestMain(m *testing.M) { } func getKafkaBrokerList() string { - brokerList := Params.Get("kafka.brokerList") + brokerList := Params.KafkaCfg.Address.GetValue() log.Info("get kafka broker list.", zap.String("address", brokerList)) return brokerList } diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index ace30fbbca..68d5f48764 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -48,17 +48,16 @@ const ( var Params = paramtable.Get() func TestMain(m *testing.M) { - Params.Init() + paramtable.Init() exitCode := m.Run() os.Exit(exitCode) } func getPulsarAddress() string { - pulsarHost := Params.GetWithDefault("pulsar.address", "") - port := Params.GetWithDefault("pulsar.port", "") - log.Info("pulsar address", zap.String("host", pulsarHost), zap.String("port", port)) - if len(pulsarHost) != 0 && len(port) != 0 { - return "pulsar://" + pulsarHost + ":" + port + pulsarAddress := Params.PulsarCfg.Address.GetValue() + log.Info("pulsar address", zap.String("address", pulsarAddress)) + if len(pulsarAddress) != 0 { + return pulsarAddress } panic("invalid pulsar address") } @@ -745,7 +744,7 @@ func TestPulsarCtl(t *testing.T) { if err != nil { panic(err) } - webport := Params.GetWithDefault("pulsar.webport", "80") + webport := Params.PulsarCfg.WebPort.GetValue() webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport admin, err := NewAdminClient(webServiceURL, "", "") assert.NoError(t, err) diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index 1bac72e7f3..cb04a7c3fd 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -176,7 +176,7 @@ func TestPulsarClientCloseUnsubscribeError(t *testing.T) { if err != nil { panic(err) } - webport := Params.GetWithDefault("pulsar.webport", "80") + webport := Params.PulsarCfg.WebPort.GetValue() webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport admin, err := NewAdminClient(webServiceURL, "", "") assert.NoError(t, err) diff --git a/pkg/util/indexparams/disk_index_params_test.go b/pkg/util/indexparams/disk_index_params_test.go index 4f7f9af2ce..503321733f 100644 --- a/pkg/util/indexparams/disk_index_params_test.go +++ b/pkg/util/indexparams/disk_index_params_test.go @@ -31,7 +31,7 @@ import ( func TestDiskIndexParams(t *testing.T) { t.Run("fill index params without auto index param", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) indexParams := make(map[string]string) err := FillDiskIndexParams(¶ms, indexParams) @@ -52,7 +52,7 @@ func TestDiskIndexParams(t *testing.T) { t.Run("fill index params with auto index", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.AutoIndexConfig.Enable.Key, "true") mapString := make(map[string]string) @@ -85,7 +85,7 @@ func TestDiskIndexParams(t *testing.T) { t.Run("fill index params with wrong auto index param", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.AutoIndexConfig.Enable.Key, "true") // ExtraParams wrong params.Save(params.AutoIndexConfig.ExtraParams.Key, "") @@ -148,7 +148,7 @@ func TestDiskIndexParams(t *testing.T) { t.Run("set disk index load params without auto index param", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) indexParams := make(map[string]string) err := SetDiskIndexLoadParams(¶ms, indexParams, 100) @@ -201,7 +201,7 @@ func TestDiskIndexParams(t *testing.T) { t.Run("set disk index load params with auto index param", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.AutoIndexConfig.Enable.Key, "true") mapString := make(map[string]string) mapString[BuildRatioKey] = "{\"pq_code_budget_gb\": 0.125, \"num_threads\": 1}" @@ -244,7 +244,7 @@ func TestDiskIndexParams(t *testing.T) { t.Run("set disk index load params with wrong autoindex param", func(t *testing.T) { var params paramtable.ComponentParam - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.AutoIndexConfig.Enable.Key, "true") // ExtraParams wrong params.Save(params.AutoIndexConfig.ExtraParams.Key, "") diff --git a/pkg/util/interceptor/interceptor_test.go b/pkg/util/interceptor/interceptor_test.go index 6ccfb12779..f9d5d3c87c 100644 --- a/pkg/util/interceptor/interceptor_test.go +++ b/pkg/util/interceptor/interceptor_test.go @@ -40,5 +40,5 @@ func (m *mockSS) Context() context.Context { } func init() { - paramtable.Get().Init() + paramtable.Init() } diff --git a/pkg/util/lock/metrics_mutex_test.go b/pkg/util/lock/metrics_mutex_test.go index 89a97e91d0..8db946c966 100644 --- a/pkg/util/lock/metrics_mutex_test.go +++ b/pkg/util/lock/metrics_mutex_test.go @@ -14,7 +14,7 @@ func TestMetricsLockLock(t *testing.T) { lManager := &MetricsLockManager{ rwLocks: make(map[string]*MetricsRWMutex, 0), } - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.CommonCfg.EnableLockMetrics.Key, "true") params.Save(params.CommonCfg.LockSlowLogInfoThreshold.Key, "10") lName := "testLock" @@ -43,7 +43,7 @@ func TestMetricsLockRLock(t *testing.T) { lManager := &MetricsLockManager{ rwLocks: make(map[string]*MetricsRWMutex, 0), } - params.Init() + params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) params.Save(params.CommonCfg.EnableLockMetrics.Key, "true") params.Save(params.CommonCfg.LockSlowLogWarnThreshold.Key, "10") lName := "testLock" diff --git a/pkg/util/paramtable/autoindex_param_test.go b/pkg/util/paramtable/autoindex_param_test.go index 762e7b517b..0dcdd99454 100644 --- a/pkg/util/paramtable/autoindex_param_test.go +++ b/pkg/util/paramtable/autoindex_param_test.go @@ -37,7 +37,8 @@ const ( func TestAutoIndexParams_build(t *testing.T) { var CParams ComponentParam - CParams.Init() + bt := NewBaseTable(SkipRemote(true)) + CParams.Init(bt) t.Run("test parseBuildParams success", func(t *testing.T) { //Params := CParams.AutoIndexConfig @@ -51,7 +52,7 @@ func TestAutoIndexParams_build(t *testing.T) { var jsonStrBytes []byte jsonStrBytes, err = json.Marshal(map1) assert.NoError(t, err) - CParams.Save(CParams.AutoIndexConfig.IndexParams.Key, string(jsonStrBytes)) + bt.Save(CParams.AutoIndexConfig.IndexParams.Key, string(jsonStrBytes)) assert.Equal(t, "HNSW", CParams.AutoIndexConfig.IndexType.GetValue()) assert.Equal(t, strconv.Itoa(map1["M"].(int)), CParams.AutoIndexConfig.IndexParams.GetAsJSONMap()["M"]) assert.Equal(t, strconv.Itoa(map1["efConstruction"].(int)), CParams.AutoIndexConfig.IndexParams.GetAsJSONMap()["efConstruction"]) @@ -62,7 +63,7 @@ func TestAutoIndexParams_build(t *testing.T) { } jsonStrBytes, err = json.Marshal(map2) assert.NoError(t, err) - CParams.Save(CParams.AutoIndexConfig.IndexParams.Key, string(jsonStrBytes)) + bt.Save(CParams.AutoIndexConfig.IndexParams.Key, string(jsonStrBytes)) assert.Equal(t, "IVF_FLAT", CParams.AutoIndexConfig.IndexType.GetValue()) assert.Equal(t, strconv.Itoa(map2["nlist"].(int)), CParams.AutoIndexConfig.IndexParams.GetAsJSONMap()["nlist"]) }) diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 3a1b8c5c81..697df59ceb 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -62,41 +62,70 @@ var defaultYaml = []string{"milvus.yaml"} // BaseTable the basics of paramtable type BaseTable struct { - once sync.Once - mgr *config.Manager + once sync.Once + mgr *config.Manager + config *baseTableConfig +} - configDir string - YamlFiles []string +type baseTableConfig struct { + configDir string + refreshInterval int + skipRemote bool + skipEnv bool + yamlFiles []string +} + +type Option func(*baseTableConfig) + +func Files(files []string) Option { + return func(bt *baseTableConfig) { + bt.yamlFiles = files + } +} + +func Interval(interval int) Option { + return func(bt *baseTableConfig) { + bt.refreshInterval = interval + } +} + +func SkipRemote(skip bool) Option { + return func(bt *baseTableConfig) { + bt.skipRemote = skip + } +} + +func skipEnv(skip bool) Option { + return func(bt *baseTableConfig) { + bt.skipEnv = skip + } } // NewBaseTableFromYamlOnly only used in migration tool. // Maybe we shouldn't limit the configDir internally. func NewBaseTableFromYamlOnly(yaml string) *BaseTable { - mgr, _ := config.Init(config.WithFilesSource(&config.FileInfo{ - Files: []string{yaml}, - RefreshInterval: 10 * time.Second, - })) - gp := &BaseTable{mgr: mgr, YamlFiles: []string{yaml}} - return gp + return NewBaseTable(Files([]string{yaml}), SkipRemote(true), skipEnv(true)) } -// GlobalInitWithYaml initializes the param table with the given yaml. -// We will update the global DefaultYaml variable directly, once and for all. -// GlobalInitWithYaml shall be called at the very beginning before initiating the base table. -// GlobalInitWithYaml should be called only in standalone and embedded Milvus. -func (gp *BaseTable) GlobalInitWithYaml(yaml string) { - gp.once.Do(func() { - defaultYaml = []string{yaml} - }) -} - -func (gp *BaseTable) UpdateSourceOptions(opts ...config.Option) { - gp.mgr.UpdateSourceOptions(opts...) +func NewBaseTable(opts ...Option) *BaseTable { + defaultConfig := &baseTableConfig{ + configDir: initConfPath(), + yamlFiles: defaultYaml, + refreshInterval: 5, + skipRemote: false, + skipEnv: false, + } + for _, opt := range opts { + opt(defaultConfig) + } + bt := &BaseTable{config: defaultConfig} + bt.init() + return bt } // init initializes the param table. // if refreshInterval greater than 0 will auto refresh config from source -func (gp *BaseTable) init(refreshInterval int) { +func (bt *BaseTable) init() { formatter := func(key string) string { ret := strings.ToLower(key) ret = strings.TrimPrefix(ret, "milvus.") @@ -105,35 +134,39 @@ func (gp *BaseTable) init(refreshInterval int) { ret = strings.ReplaceAll(ret, ".", "") return ret } - if len(gp.YamlFiles) == 0 { - gp.YamlFiles = defaultYaml + bt.mgr, _ = config.Init() + if !bt.config.skipEnv { + err := bt.mgr.AddSource(config.NewEnvSource(formatter)) + if err != nil { + log.Warn("init baseTable with env failed", zap.Error(err)) + return + } } - var err error - gp.mgr, err = config.Init(config.WithEnvSource(formatter)) - if err != nil { - return + bt.initConfigsFromLocal() + if !bt.config.skipRemote { + bt.initConfigsFromRemote() } - gp.initConfigsFromLocal(refreshInterval) - gp.initConfigsFromRemote(refreshInterval) + log.Info("Got Config", zap.Any("configs", bt.mgr.GetConfigs())) } -func (gp *BaseTable) initConfigsFromLocal(refreshInterval int) { - gp.configDir = gp.initConfPath() - err := gp.mgr.AddSource(config.NewFileSource(&config.FileInfo{ - Files: lo.Map(gp.YamlFiles, func(file string, _ int) string { - return path.Join(gp.configDir, file) +func (bt *BaseTable) initConfigsFromLocal() { + refreshInterval := bt.config.refreshInterval + err := bt.mgr.AddSource(config.NewFileSource(&config.FileInfo{ + Files: lo.Map(bt.config.yamlFiles, func(file string, _ int) string { + return path.Join(bt.config.configDir, file) }), RefreshInterval: time.Duration(refreshInterval) * time.Second, })) if err != nil { - log.Warn("init baseTable with file failed", zap.Strings("configFile", gp.YamlFiles), zap.Error(err)) + log.Warn("init baseTable with file failed", zap.Strings("configFile", bt.config.yamlFiles), zap.Error(err)) return } } -func (gp *BaseTable) initConfigsFromRemote(refreshInterval int) { +func (bt *BaseTable) initConfigsFromRemote() { + refreshInterval := bt.config.refreshInterval etcdConfig := EtcdConfig{} - etcdConfig.Init(gp) + etcdConfig.Init(bt) etcdConfig.Endpoints.PanicIfEmpty = false etcdConfig.RootPath.PanicIfEmpty = false if etcdConfig.Endpoints.GetValue() == "" { @@ -159,52 +192,57 @@ func (gp *BaseTable) initConfigsFromRemote(refreshInterval int) { log.Info("init with etcd failed", zap.Error(err)) return } - gp.mgr.AddSource(s) - s.SetEventHandler(gp.mgr) + bt.mgr.AddSource(s) + s.SetEventHandler(bt.mgr) } // GetConfigDir returns the config directory -func (gp *BaseTable) GetConfigDir() string { - return gp.configDir +func (bt *BaseTable) GetConfigDir() string { + return bt.config.configDir } -func (gp *BaseTable) initConfPath() string { +func initConfPath() string { // check if user set conf dir through env - configDir, err := gp.mgr.GetConfig("MILVUSCONF") + configDir := os.Getenv("MILVUSCONF") + if len(configDir) != 0 { + return configDir + } + runPath, err := os.Getwd() if err != nil { - runPath, err := os.Getwd() - if err != nil { - panic(err) - } - configDir = runPath + "/configs" - if _, err := os.Stat(configDir); err != nil { - _, fpath, _, _ := runtime.Caller(0) - configDir = path.Dir(fpath) + "/../../../configs" - } + panic(err) + } + configDir = runPath + "/configs" + if _, err := os.Stat(configDir); err != nil { + _, fpath, _, _ := runtime.Caller(0) + configDir = path.Dir(fpath) + "/../../../configs" } return configDir } -func (gp *BaseTable) FileConfigs() map[string]string { - return gp.mgr.FileConfigs() +func (bt *BaseTable) FileConfigs() map[string]string { + return bt.mgr.FileConfigs() +} + +func (bt *BaseTable) UpdateSourceOptions(opts ...config.Option) { + bt.mgr.UpdateSourceOptions(opts...) } // Load loads an object with @key. -func (gp *BaseTable) Load(key string) (string, error) { - return gp.mgr.GetConfig(key) +func (bt *BaseTable) Load(key string) (string, error) { + return bt.mgr.GetConfig(key) } -func (gp *BaseTable) Get(key string) string { - return gp.GetWithDefault(key, "") +func (bt *BaseTable) Get(key string) string { + return bt.GetWithDefault(key, "") } // GetWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. -func (gp *BaseTable) GetWithDefault(key, defaultValue string) string { - if gp.mgr == nil { +func (bt *BaseTable) GetWithDefault(key, defaultValue string) string { + if bt.mgr == nil { return defaultValue } - str, err := gp.mgr.GetConfig(key) + str, err := bt.mgr.GetConfig(key) if err != nil { return defaultValue } @@ -212,19 +250,19 @@ func (gp *BaseTable) GetWithDefault(key, defaultValue string) string { } // Remove Config by key -func (gp *BaseTable) Remove(key string) error { - gp.mgr.DeleteConfig(key) +func (bt *BaseTable) Remove(key string) error { + bt.mgr.DeleteConfig(key) return nil } // Update Config -func (gp *BaseTable) Save(key, value string) error { - gp.mgr.SetConfig(key, value) +func (bt *BaseTable) Save(key, value string) error { + bt.mgr.SetConfig(key, value) return nil } // Reset Config to default value -func (gp *BaseTable) Reset(key string) error { - gp.mgr.ResetConfig(key) +func (bt *BaseTable) Reset(key string) error { + bt.mgr.ResetConfig(key) return nil } diff --git a/pkg/util/paramtable/base_table_test.go b/pkg/util/paramtable/base_table_test.go index e70c90d55f..bcba2b0364 100644 --- a/pkg/util/paramtable/base_table_test.go +++ b/pkg/util/paramtable/base_table_test.go @@ -21,10 +21,10 @@ import ( "github.com/milvus-io/milvus/pkg/config" ) -var baseParams = BaseTable{} +var baseParams = NewBaseTable(SkipRemote(true)) func TestMain(m *testing.M) { - baseParams.init(0) + baseParams.init() code := m.Run() os.Exit(code) } @@ -112,7 +112,7 @@ func TestBaseTable_Get(t *testing.T) { func TestBaseTable_Pulsar(t *testing.T) { //test PULSAR ADDRESS t.Setenv("PULSAR_ADDRESS", "pulsar://localhost:6650") - baseParams.init(0) + baseParams.init() address := baseParams.Get("pulsar.address") assert.Equal(t, "pulsar://localhost:6650", address) @@ -125,7 +125,7 @@ func TestBaseTable_Env(t *testing.T) { t.Setenv("milvus.test", "test") t.Setenv("milvus.test.test2", "test2") - baseParams.init(0) + baseParams.init() result, _ := baseParams.Load("test") assert.Equal(t, result, "test") @@ -134,7 +134,7 @@ func TestBaseTable_Env(t *testing.T) { t.Setenv("milvus.invalid", "xxx=test") - baseParams.init(0) + baseParams.init() result, _ = baseParams.Load("invalid") assert.Equal(t, result, "xxx=test") } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index cbd6d3111a..e13c01e721 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -52,7 +52,8 @@ const ( // ComponentParam is used to quickly and easily access all components' configurations. type ComponentParam struct { ServiceParam - once sync.Once + once sync.Once + baseTable *BaseTable CommonCfg commonConfig QuotaConfig quotaConfig @@ -68,7 +69,6 @@ type ComponentParam struct { IndexNodeCfg indexNodeConfig HTTPCfg httpConfig LogCfg logConfig - HookCfg hookConfig RootCoordGrpcServerCfg GrpcServerConfig ProxyGrpcServerCfg GrpcServerConfig @@ -90,68 +90,68 @@ type ComponentParam struct { } // Init initialize once -func (p *ComponentParam) Init() { +func (p *ComponentParam) Init(bt *BaseTable) { p.once.Do(func() { - p.init() + p.init(bt) }) } // init initialize the global param table -func (p *ComponentParam) init() { - p.ServiceParam.init() +func (p *ComponentParam) init(bt *BaseTable) { + p.baseTable = bt + p.ServiceParam.init(bt) - p.CommonCfg.init(&p.BaseTable) - p.QuotaConfig.init(&p.BaseTable) - p.AutoIndexConfig.init(&p.BaseTable) - p.TraceCfg.init(&p.BaseTable) + p.CommonCfg.init(bt) + p.QuotaConfig.init(bt) + p.AutoIndexConfig.init(bt) + p.TraceCfg.init(bt) - p.RootCoordCfg.init(&p.BaseTable) - p.ProxyCfg.init(&p.BaseTable) - p.QueryCoordCfg.init(&p.BaseTable) - p.QueryNodeCfg.init(&p.BaseTable) - p.DataCoordCfg.init(&p.BaseTable) - p.DataNodeCfg.init(&p.BaseTable) - p.IndexNodeCfg.init(&p.BaseTable) - p.HTTPCfg.init(&p.BaseTable) - p.LogCfg.init(&p.BaseTable) - p.HookCfg.init() + p.RootCoordCfg.init(bt) + p.ProxyCfg.init(bt) + p.QueryCoordCfg.init(bt) + p.QueryNodeCfg.init(bt) + p.DataCoordCfg.init(bt) + p.DataNodeCfg.init(bt) + p.IndexNodeCfg.init(bt) + p.HTTPCfg.init(bt) + p.LogCfg.init(bt) - p.RootCoordGrpcServerCfg.Init("rootCoord", &p.BaseTable) - p.ProxyGrpcServerCfg.Init("proxy", &p.BaseTable) + p.RootCoordGrpcServerCfg.Init("rootCoord", bt) + p.ProxyGrpcServerCfg.Init("proxy", bt) p.ProxyGrpcServerCfg.InternalPort.Export = true - p.QueryCoordGrpcServerCfg.Init("queryCoord", &p.BaseTable) - p.QueryNodeGrpcServerCfg.Init("queryNode", &p.BaseTable) - p.DataCoordGrpcServerCfg.Init("dataCoord", &p.BaseTable) - p.DataNodeGrpcServerCfg.Init("dataNode", &p.BaseTable) - p.IndexNodeGrpcServerCfg.Init("indexNode", &p.BaseTable) + p.QueryCoordGrpcServerCfg.Init("queryCoord", bt) + p.QueryNodeGrpcServerCfg.Init("queryNode", bt) + p.DataCoordGrpcServerCfg.Init("dataCoord", bt) + p.DataNodeGrpcServerCfg.Init("dataNode", bt) + p.IndexNodeGrpcServerCfg.Init("indexNode", bt) - p.RootCoordGrpcClientCfg.Init("rootCoord", &p.BaseTable) - p.ProxyGrpcClientCfg.Init("proxy", &p.BaseTable) - p.QueryCoordGrpcClientCfg.Init("queryCoord", &p.BaseTable) - p.QueryNodeGrpcClientCfg.Init("queryNode", &p.BaseTable) - p.DataCoordGrpcClientCfg.Init("dataCoord", &p.BaseTable) - p.DataNodeGrpcClientCfg.Init("dataNode", &p.BaseTable) - p.IndexNodeGrpcClientCfg.Init("indexNode", &p.BaseTable) + p.RootCoordGrpcClientCfg.Init("rootCoord", bt) + p.ProxyGrpcClientCfg.Init("proxy", bt) + p.QueryCoordGrpcClientCfg.Init("queryCoord", bt) + p.QueryNodeGrpcClientCfg.Init("queryNode", bt) + p.DataCoordGrpcClientCfg.Init("dataCoord", bt) + p.DataNodeGrpcClientCfg.Init("dataNode", bt) + p.IndexNodeGrpcClientCfg.Init("indexNode", bt) - p.IntegrationTestCfg.init(&p.BaseTable) + p.IntegrationTestCfg.init(bt) } func (p *ComponentParam) GetComponentConfigurations(componentName string, sub string) map[string]string { allownPrefixs := append(globalConfigPrefixs(), componentName+".") - return p.mgr.GetBy(config.WithSubstr(sub), config.WithOneOfPrefixs(allownPrefixs...)) + return p.baseTable.mgr.GetBy(config.WithSubstr(sub), config.WithOneOfPrefixs(allownPrefixs...)) } func (p *ComponentParam) GetAll() map[string]string { - return p.mgr.GetConfigs() + return p.baseTable.mgr.GetConfigs() } func (p *ComponentParam) Watch(key string, watcher config.EventHandler) { - p.mgr.Dispatcher.Register(key, watcher) + p.baseTable.mgr.Dispatcher.Register(key, watcher) } func (p *ComponentParam) WatchKeyPrefix(keyPrefix string, watcher config.EventHandler) { - p.mgr.Dispatcher.RegisterForKeyPrefix(keyPrefix, watcher) + p.baseTable.mgr.Dispatcher.RegisterForKeyPrefix(keyPrefix, watcher) } // ///////////////////////////////////////////////////////////////////////////// @@ -2587,3 +2587,16 @@ func (p *integrationTestConfig) init(base *BaseTable) { } p.IntegrationMode.Init(base.mgr) } + +func (params *ComponentParam) Save(key string, value string) error { + return params.baseTable.Save(key, value) +} +func (params *ComponentParam) Remove(key string) error { + return params.baseTable.Remove(key) +} +func (params *ComponentParam) Reset(key string) error { + return params.baseTable.Reset(key) +} +func (params *ComponentParam) GetWithDefault(key string, dft string) string { + return params.baseTable.GetWithDefault(key, dft) +} diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 71fcedc7da..5d7d4e1968 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -397,7 +397,7 @@ func TestForbiddenItem(t *testing.T) { Init() params := Get() - params.mgr.OnEvent(&config.Event{ + params.baseTable.mgr.OnEvent(&config.Event{ Key: params.CommonCfg.ClusterPrefix.Key, Value: "new-cluster", }) diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index ff9bc2ec0d..235f066896 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -23,9 +23,9 @@ import ( func TestGrpcServerParams(t *testing.T) { role := typeutil.DataNodeRole base := &ComponentParam{} - base.Init() + base.Init(NewBaseTable(SkipRemote(true))) var serverConfig GrpcServerConfig - serverConfig.Init(role, &base.BaseTable) + serverConfig.Init(role, base.baseTable) assert.Equal(t, serverConfig.Domain, role) t.Logf("Domain = %s", serverConfig.Domain) @@ -66,9 +66,9 @@ func TestGrpcServerParams(t *testing.T) { func TestGrpcClientParams(t *testing.T) { role := typeutil.DataNodeRole base := ComponentParam{} - base.Init() + base.Init(NewBaseTable(SkipRemote(true))) var clientConfig GrpcClientConfig - clientConfig.Init(role, &base.BaseTable) + clientConfig.Init(role, base.baseTable) assert.Equal(t, clientConfig.Domain, role) t.Logf("Domain = %s", clientConfig.Domain) diff --git a/pkg/util/paramtable/hook_config.go b/pkg/util/paramtable/hook_config.go index 1fb39ecac0..37bef02437 100644 --- a/pkg/util/paramtable/hook_config.go +++ b/pkg/util/paramtable/hook_config.go @@ -2,8 +2,6 @@ package paramtable import ( "github.com/milvus-io/milvus/pkg/config" - "github.com/milvus-io/milvus/pkg/log" - "go.uber.org/zap" ) const hookYamlFile = "hook.yaml" @@ -15,25 +13,21 @@ type hookConfig struct { SoConfig ParamGroup `refreshable:"true"` } -func (h *hookConfig) init() { - hookBase := &BaseTable{YamlFiles: []string{hookYamlFile}} - hookBase.init(2) - h.hookBase = hookBase - - log.Info("hook config", zap.Any("hook", hookBase.FileConfigs())) +func (h *hookConfig) init(base *BaseTable) { + h.hookBase = base h.SoPath = ParamItem{ Key: "soPath", Version: "2.0.0", DefaultValue: "", } - h.SoPath.Init(hookBase.mgr) + h.SoPath.Init(base.mgr) h.SoConfig = ParamGroup{ KeyPrefix: "", Version: "2.2.0", } - h.SoConfig.Init(hookBase.mgr) + h.SoConfig.Init(base.mgr) } func (h *hookConfig) WatchHookWithPrefix(ident string, keyPrefix string, onEvent func(*config.Event)) { diff --git a/pkg/util/paramtable/http_param_test.go b/pkg/util/paramtable/http_param_test.go index a1c80f9652..8e32450f34 100644 --- a/pkg/util/paramtable/http_param_test.go +++ b/pkg/util/paramtable/http_param_test.go @@ -8,7 +8,7 @@ import ( func TestHTTPConfig_Init(t *testing.T) { params := ComponentParam{} - params.Init() + params.Init(NewBaseTable(SkipRemote(true))) cfg := ¶ms.HTTPCfg assert.Equal(t, cfg.Enabled.GetAsBool(), true) assert.Equal(t, cfg.DebugMode.GetAsBool(), false) diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index e3dab2ace1..d8bf1cd30d 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -25,7 +25,7 @@ import ( func TestQuotaParam(t *testing.T) { qc := quotaConfig{} - qc.init(&baseParams) + qc.init(baseParams) t.Run("test quota", func(t *testing.T) { assert.True(t, qc.QuotaAndLimitsEnabled.GetAsBool()) @@ -48,7 +48,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test dml", func(t *testing.T) { - params.Init() + params.Init(NewBaseTable(SkipRemote(true))) params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DMLMaxInsertRate.Key, "10") params.Save(params.QuotaConfig.DMLMinInsertRate.Key, "1") @@ -70,7 +70,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test collection dml", func(t *testing.T) { - params.Init() + params.Init(NewBaseTable(SkipRemote(true))) params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "10") params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "1") @@ -128,7 +128,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test dql", func(t *testing.T) { - params.Init() + params.Init(NewBaseTable(SkipRemote(true))) params.Save(params.QuotaConfig.DQLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DQLMaxSearchRate.Key, "10") params.Save(params.QuotaConfig.DQLMinSearchRate.Key, "1") @@ -141,7 +141,7 @@ func TestQuotaParam(t *testing.T) { }) t.Run("test collection dql", func(t *testing.T) { - params.Init() + params.Init(NewBaseTable(SkipRemote(true))) params.Save(params.QuotaConfig.DQLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DQLMaxSearchRatePerCollection.Key, "10") params.Save(params.QuotaConfig.DQLMinSearchRatePerCollection.Key, "1") diff --git a/pkg/util/paramtable/runtime.go b/pkg/util/paramtable/runtime.go index 2199148b3c..7831dc3755 100644 --- a/pkg/util/paramtable/runtime.go +++ b/pkg/util/paramtable/runtime.go @@ -13,6 +13,7 @@ package paramtable import ( "strconv" + "sync" "time" ) @@ -23,22 +24,45 @@ const ( runtimeUpdateTimeKey = "runtime.updateTime" ) +var once sync.Once var params ComponentParam +var hookParams hookConfig func Init() { - params.Init() + once.Do(func() { + baseTable := NewBaseTable() + params.Init(baseTable) + hookBaseTable := NewBaseTableFromYamlOnly(hookYamlFile) + hookParams.init(hookBaseTable) + }) +} + +func InitWithBaseTable(baseTable *BaseTable) { + once.Do(func() { + params.Init(baseTable) + hookBaseTable := NewBaseTableFromYamlOnly(hookYamlFile) + hookParams.init(hookBaseTable) + }) } func Get() *ComponentParam { return ¶ms } +func GetBaseTable() *BaseTable { + return params.baseTable +} + +func GetHookParams() *hookConfig { + return &hookParams +} + func SetNodeID(newID UniqueID) { - params.Save(runtimeNodeIDKey, strconv.FormatInt(newID, 10)) + params.baseTable.Save(runtimeNodeIDKey, strconv.FormatInt(newID, 10)) } func GetNodeID() UniqueID { - nodeID, err := strconv.ParseInt(params.Get(runtimeNodeIDKey), 10, 64) + nodeID, err := strconv.ParseInt(params.baseTable.Get(runtimeNodeIDKey), 10, 64) if err != nil { return 0 } @@ -46,27 +70,30 @@ func GetNodeID() UniqueID { } func SetRole(role string) { - params.Save(runtimeRoleKey, role) + params.baseTable.Save(runtimeRoleKey, role) } func GetRole() string { - return params.Get(runtimeRoleKey) + if params.baseTable == nil { + return "" + } + return params.baseTable.Get(runtimeRoleKey) } func SetCreateTime(d time.Time) { - params.Save(runtimeCreateTimeKey, strconv.FormatInt(d.UnixNano(), 10)) + params.baseTable.Save(runtimeCreateTimeKey, strconv.FormatInt(d.UnixNano(), 10)) } func GetCreateTime() time.Time { - v, _ := strconv.ParseInt(params.Get(runtimeCreateTimeKey), 10, 64) + v, _ := strconv.ParseInt(params.baseTable.Get(runtimeCreateTimeKey), 10, 64) return time.Unix(v/1e9, v%1e9) } func SetUpdateTime(d time.Time) { - params.Save(runtimeUpdateTimeKey, strconv.FormatInt(d.UnixNano(), 10)) + params.baseTable.Save(runtimeUpdateTimeKey, strconv.FormatInt(d.UnixNano(), 10)) } func GetUpdateTime() time.Time { - v, _ := strconv.ParseInt(params.Get(runtimeUpdateTimeKey), 10, 64) + v, _ := strconv.ParseInt(params.baseTable.Get(runtimeUpdateTimeKey), 10, 64) return time.Unix(v/1e9, v%1e9) } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index fcd8bbcb5f..2e9196705f 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -42,8 +42,6 @@ const ( // ServiceParam is used to quickly and easily access all basic service configurations. type ServiceParam struct { - BaseTable - LocalStorageCfg LocalStorageConfig MetaStoreCfg MetaStoreConfig EtcdCfg EtcdConfig @@ -55,18 +53,16 @@ type ServiceParam struct { MinioCfg MinioConfig } -func (p *ServiceParam) init() { - p.BaseTable.init(2) - - p.LocalStorageCfg.Init(&p.BaseTable) - p.MetaStoreCfg.Init(&p.BaseTable) - p.EtcdCfg.Init(&p.BaseTable) - p.MQCfg.Init(&p.BaseTable) - p.PulsarCfg.Init(&p.BaseTable) - p.KafkaCfg.Init(&p.BaseTable) - p.RocksmqCfg.Init(&p.BaseTable) - p.NatsmqCfg.Init(&p.BaseTable) - p.MinioCfg.Init(&p.BaseTable) +func (p *ServiceParam) init(bt *BaseTable) { + p.LocalStorageCfg.Init(bt) + p.MetaStoreCfg.Init(bt) + p.EtcdCfg.Init(bt) + p.MQCfg.Init(bt) + p.PulsarCfg.Init(bt) + p.KafkaCfg.Init(bt) + p.RocksmqCfg.Init(bt) + p.NatsmqCfg.Init(bt) + p.MinioCfg.Init(bt) } func (p *ServiceParam) RocksmqEnable() bool { diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 38fca6e90c..ff884a36e6 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -22,8 +22,8 @@ import ( func TestServiceParam(t *testing.T) { var SParams ServiceParam - SParams.init() - + bt := NewBaseTable(SkipRemote(true)) + SParams.init(bt) t.Run("test etcdConfig", func(t *testing.T) { Params := &SParams.EtcdCfg @@ -54,11 +54,13 @@ func TestServiceParam(t *testing.T) { // test UseEmbedEtcd t.Setenv("etcd.use.embed", "true") t.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode) - assert.Panics(t, func() { SParams.init() }) + assert.Panics(t, func() { + NewBaseTable() + }) t.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) t.Setenv("etcd.use.embed", "false") - SParams.init() + SParams.init(bt) }) t.Run("test pulsarConfig", func(t *testing.T) { @@ -77,13 +79,13 @@ func TestServiceParam(t *testing.T) { address := "pulsar://localhost:6650" { - SParams.BaseTable.Save("pulsar.address", address) + bt.Save("pulsar.address", address) assert.Equal(t, SParams.PulsarCfg.Address.GetValue(), address) } { - SParams.BaseTable.Save("pulsar.address", "localhost") - SParams.BaseTable.Save("pulsar.port", "6650") + bt.Save("pulsar.address", "localhost") + bt.Save("pulsar.port", "6650") assert.Equal(t, SParams.PulsarCfg.Address.GetValue(), address) } }) @@ -96,12 +98,12 @@ func TestServiceParam(t *testing.T) { } { - SParams.BaseTable.Save(SParams.PulsarCfg.Address.Key, "u\\invalid") + bt.Save(SParams.PulsarCfg.Address.Key, "u\\invalid") assert.Equal(t, SParams.PulsarCfg.WebAddress.GetValue(), "") } { - SParams.BaseTable.Save(SParams.PulsarCfg.Address.Key, "") + bt.Save(SParams.PulsarCfg.Address.Key, "") assert.Equal(t, SParams.PulsarCfg.WebAddress.GetValue(), "") } }) diff --git a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go index fd6d157d9a..681b22dd3b 100644 --- a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go +++ b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go @@ -87,7 +87,7 @@ func (s *CrossClusterRoutingSuite) SetupSuite() { s.ctx, s.cancel = context.WithTimeout(context.Background(), time.Second*180) rand.Seed(time.Now().UnixNano()) - paramtable.Get().Init() + paramtable.Init() s.factory = dependency.NewDefaultFactory(true) } diff --git a/tests/integration/minicluster.go b/tests/integration/minicluster.go index a34686ed10..f73d68674e 100644 --- a/tests/integration/minicluster.go +++ b/tests/integration/minicluster.go @@ -125,7 +125,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster cluster = &MiniCluster{ ctx: ctx, } - params.Init() + paramtable.Init() cluster.params = DefaultParams() cluster.clusterConfig = DefaultClusterConfig() for _, opt := range opts { @@ -134,7 +134,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster for k, v := range cluster.params { params.Save(k, v) } - params.UpdateSourceOptions(config.WithEtcdSource(&config.EtcdInfo{ + paramtable.GetBaseTable().UpdateSourceOptions(config.WithEtcdSource(&config.EtcdInfo{ KeyPrefix: cluster.params[EtcdRootPath], RefreshInterval: 2 * time.Second, })) diff --git a/tests/integration/refreshconfig/refresh_config_test.go b/tests/integration/refreshconfig/refresh_config_test.go index 0b3cf6b659..a2f369aeb0 100644 --- a/tests/integration/refreshconfig/refresh_config_test.go +++ b/tests/integration/refreshconfig/refresh_config_test.go @@ -53,7 +53,9 @@ func (s *RefreshConfigSuite) TestRefreshPasswordLength() { params := paramtable.Get() key := fmt.Sprintf("%s/config/proxy/minpasswordlength", params.EtcdCfg.RootPath.GetValue()) - c.EtcdCli.KV.Put(ctx, key, "3") + log.Debug("etcd key", zap.String("key", key), zap.Any("endpoints", c.EtcdCli.Endpoints())) + r, e := c.EtcdCli.KV.Put(ctx, key, "3") + log.Debug("etcd put result", zap.Any("resp", r), zap.Error(e)) s.Eventually(func() bool { resp, err = c.Proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{