diff --git a/deployments/docker/standalone/docker-compose.yml b/deployments/docker/standalone/docker-compose.yml index 4166c9b446..ebe8beaa1c 100644 --- a/deployments/docker/standalone/docker-compose.yml +++ b/deployments/docker/standalone/docker-compose.yml @@ -11,7 +11,7 @@ services: - ETCD_SNAPSHOT_COUNT=50000 volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd - command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd healthcheck: test: ["CMD", "etcdctl", "endpoint", "health"] interval: 30s diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index b49f74a2db..e23a2a2b23 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -52,6 +52,7 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction ResultSegments: lo.Map(task.ResultSegments, func(t int64, i int) string { return strconv.FormatInt(t, 10) }), + NodeID: task.NodeID, } } @@ -70,7 +71,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal ctx: ctx, catalog: catalog, compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0), - taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](32, nil, time.Minute*15), + taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](512, nil, time.Minute*15), } if err := csm.reloadFromKV(); err != nil { return nil, err diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index f053689de4..1453c0afaa 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -53,7 +53,7 @@ type importTasks struct { func newImportTasks() *importTasks { return &importTasks{ tasks: make(map[int64]ImportTask), - taskStats: expirable.NewLRU[UniqueID, ImportTask](64, nil, time.Minute*30), + taskStats: expirable.NewLRU[UniqueID, ImportTask](512, nil, time.Minute*30), } } diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 60b59080d3..ac8d03bb3f 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -82,6 +82,7 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { IndexVersion: s.IndexVersion, CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000), FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000), + NodeID: s.NodeID, } } @@ -98,7 +99,7 @@ func newSegmentIndexBuildInfo() *segmentBuildInfo { // build ID -> segment index buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), // build ID -> task stats - taskStats: expirable.NewLRU[UniqueID, *metricsinfo.IndexTaskStats](64, nil, time.Minute*30), + taskStats: expirable.NewLRU[UniqueID, *metricsinfo.IndexTaskStats](1024, nil, time.Minute*30), } } diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index 0a94a76e05..1c8d480b61 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -104,7 +104,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { allocator: alloc, tasks: make(map[int64]Task), meta: mt, - taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*5), + taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5), }, allocator: alloc, } diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index e04d6d0ceb..def9b04bb8 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -93,7 +93,7 @@ func newTaskScheduler( handler: handler, indexEngineVersionManager: indexEngineVersionManager, allocator: allocator, - taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), + taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*15), compactionHandler: compactionHandler, } ts.reloadFromMeta() diff --git a/internal/flushcommon/syncmgr/sync_manager.go b/internal/flushcommon/syncmgr/sync_manager.go index da0ee0e99a..d64c813cf1 100644 --- a/internal/flushcommon/syncmgr/sync_manager.go +++ b/internal/flushcommon/syncmgr/sync_manager.go @@ -72,7 +72,7 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager { keyLockDispatcher: dispatcher, chunkManager: chunkManager, tasks: typeutil.NewConcurrentMap[string, Task](), - taskStats: expirable.NewLRU[string, Task](16, nil, time.Minute*15), + taskStats: expirable.NewLRU[string, Task](64, nil, time.Minute*15), } // setup config update watcher params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index cafa76fb72..184cdf148e 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6755,7 +6755,7 @@ func DeregisterSubLabel(subLabel string) { func (node *Proxy) RegisterRestRouter(router gin.IRouter) { // Cluster request that executed by proxy router.GET(http.ClusterInfoPath, getClusterInfo(node)) - router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetAll())) + router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetConfigsView())) router.GET(http.ClusterClientsPath, getConnectedClients) router.GET(http.ClusterDependenciesPath, getDependencies) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index ef3d110740..a997d95c15 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -284,7 +284,7 @@ func NewScheduler(ctx context.Context, channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), processQueue: newTaskQueue(), waitQueue: newTaskQueue(), - taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), + taskStats: expirable.NewLRU[UniqueID, Task](256, nil, time.Minute*15), segmentTaskDelta: NewExecutingTaskDelta(), channelTaskDelta: NewExecutingTaskDelta(), } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7301a2238d..6041ca1156 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -30,17 +30,17 @@ import ( func TestConfigFromEnv(t *testing.T) { mgr, _ := Init() - _, err := mgr.GetConfig("test.env") + _, _, err := mgr.GetConfig("test.env") assert.ErrorIs(t, err, ErrKeyNotFound) t.Setenv("TEST_ENV", "value") mgr, _ = Init(WithEnvSource(formatKey)) - v, err := mgr.GetConfig("test.env") + _, v, err := mgr.GetConfig("test.env") assert.NoError(t, err) assert.Equal(t, "value", v) - v, err = mgr.GetConfig("TEST_ENV") + _, v, err = mgr.GetConfig("TEST_ENV") assert.NoError(t, err) assert.Equal(t, "value", v) } @@ -67,65 +67,65 @@ func TestConfigFromRemote(t *testing.T) { ctx := context.Background() t.Run("origin is empty", func(t *testing.T) { - _, err = mgr.GetConfig("test.etcd") + _, _, err = mgr.GetConfig("test.etcd") assert.ErrorIs(t, err, ErrKeyNotFound) client.KV.Put(ctx, "test/config/test/etcd", "value") time.Sleep(100 * time.Millisecond) - v, err := mgr.GetConfig("test.etcd") + _, v, err := mgr.GetConfig("test.etcd") assert.NoError(t, err) assert.Equal(t, "value", v) - v, err = mgr.GetConfig("TEST_ETCD") + _, v, err = mgr.GetConfig("TEST_ETCD") assert.NoError(t, err) assert.Equal(t, "value", v) client.KV.Delete(ctx, "test/config/test/etcd") time.Sleep(100 * time.Millisecond) - _, err = mgr.GetConfig("TEST_ETCD") + _, _, err = mgr.GetConfig("TEST_ETCD") assert.ErrorIs(t, err, ErrKeyNotFound) }) t.Run("override origin value", func(t *testing.T) { - v, _ := mgr.GetConfig("tmp.key") + _, v, _ := mgr.GetConfig("tmp.key") assert.Equal(t, "1", v) client.KV.Put(ctx, "test/config/tmp/key", "2") time.Sleep(100 * time.Millisecond) - v, _ = mgr.GetConfig("tmp.key") + _, v, _ = mgr.GetConfig("tmp.key") assert.Equal(t, "2", v) client.KV.Put(ctx, "test/config/tmp/key", "3") time.Sleep(100 * time.Millisecond) - v, _ = mgr.GetConfig("tmp.key") + _, v, _ = mgr.GetConfig("tmp.key") assert.Equal(t, "3", v) client.KV.Delete(ctx, "test/config/tmp/key") time.Sleep(100 * time.Millisecond) - v, _ = mgr.GetConfig("tmp.key") + _, v, _ = mgr.GetConfig("tmp.key") assert.Equal(t, "1", v) }) t.Run("multi priority", func(t *testing.T) { - v, _ := mgr.GetConfig("log.level") + _, v, _ := mgr.GetConfig("log.level") assert.Equal(t, "info", v) client.KV.Put(ctx, "test/config/log/level", "error") time.Sleep(100 * time.Millisecond) - v, _ = mgr.GetConfig("log.level") + _, v, _ = mgr.GetConfig("log.level") assert.Equal(t, "error", v) client.KV.Delete(ctx, "test/config/log/level") time.Sleep(100 * time.Millisecond) - v, _ = mgr.GetConfig("log.level") + _, v, _ = mgr.GetConfig("log.level") assert.Equal(t, "info", v) }) @@ -134,7 +134,7 @@ func TestConfigFromRemote(t *testing.T) { client.KV.Put(ctx, "test/config/test/etcd", "value2") assert.Eventually(t, func() bool { - _, err = mgr.GetConfig("test.etcd") + _, _, err = mgr.GetConfig("test.etcd") return err != nil && errors.Is(err, ErrKeyNotFound) }, 300*time.Millisecond, 10*time.Millisecond) }) diff --git a/pkg/config/manager.go b/pkg/config/manager.go index 85e02c159f..dc2ae3d674 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -28,7 +28,8 @@ import ( ) const ( - TombValue = "TOMB_VAULE" + TombValue = "TOMB_VAULE" + RuntimeSource = "RuntimeSource" ) type Filter func(key string) (string, bool) @@ -118,7 +119,7 @@ func (m *Manager) GetCachedValue(key string) (interface{}, bool) { func (m *Manager) CASCachedValue(key string, origin string, value interface{}) bool { m.cacheMutex.Lock() defer m.cacheMutex.Unlock() - current, err := m.GetConfig(key) + _, current, err := m.GetConfig(key) if errors.Is(err, ErrKeyNotFound) { m.configCache[key] = value return true @@ -147,20 +148,21 @@ func (m *Manager) EvictCacheValueByFormat(keys ...string) { clear(m.configCache) } -func (m *Manager) GetConfig(key string) (string, error) { +func (m *Manager) GetConfig(key string) (string, string, error) { realKey := formatKey(key) v, ok := m.overlays.Get(realKey) if ok { if v == TombValue { - return "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found %s", key) + return "", "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found %s", key) } - return v, nil + return RuntimeSource, v, nil } sourceName, ok := m.keySourceMap.Get(realKey) if !ok { - return "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found: %s", key) + return "", "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found: %s", key) } - return m.getConfigValueBySource(realKey, sourceName) + v, err := m.getConfigValueBySource(realKey, sourceName) + return sourceName, v, err } // GetConfigs returns all the key values @@ -168,7 +170,7 @@ func (m *Manager) GetConfigs() map[string]string { config := make(map[string]string) m.keySourceMap.Range(func(key, value string) bool { - sValue, err := m.GetConfig(key) + _, sValue, err := m.GetConfig(key) if err != nil { return true } @@ -185,15 +187,40 @@ func (m *Manager) GetConfigs() map[string]string { return config } +func (m *Manager) GetConfigsView() map[string]string { + config := make(map[string]string) + + valueFmt := func(source, value string) string { + return fmt.Sprintf("%s[%s]", value, source) + } + + m.keySourceMap.Range(func(key, value string) bool { + source, sValue, err := m.GetConfig(key) + if err != nil { + return true + } + + config[key] = valueFmt(source, sValue) + return true + }) + + m.overlays.Range(func(key, value string) bool { + config[key] = valueFmt(RuntimeSource, value) + return true + }) + + return config +} + func (m *Manager) GetBy(filters ...Filter) map[string]string { matchedConfig := make(map[string]string) - m.keySourceMap.Range(func(key, value string) bool { + m.keySourceMap.Range(func(key string, value string) bool { newkey, ok := filterate(key, filters...) if !ok { return true } - sValue, err := m.GetConfig(key) + _, sValue, err := m.GetConfig(key) if err != nil { return true } diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index c2afac8233..a832c247f3 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -49,12 +49,12 @@ func TestConfigChangeEvent(t *testing.T) { mgr, _ := Init() err := mgr.AddSource(fs) assert.NoError(t, err) - res, err := mgr.GetConfig("a.b") + _, res, err := mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, res, "3") os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 6"), 0o600) time.Sleep(3 * time.Second) - res, err = mgr.GetConfig("a.b") + _, res, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, res, "6") } @@ -78,10 +78,10 @@ func TestBasic(t *testing.T) { // test set config mgr.SetConfig("a.b", "aaa") - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "aaa") - _, err = mgr.GetConfig("a.a") + _, _, err = mgr.GetConfig("a.a") assert.Error(t, err) // test delete config @@ -105,7 +105,7 @@ func TestBasic(t *testing.T) { Key: "ab", Value: "aaa", }) - value, err = mgr.GetConfig("a.b") + _, value, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "aaa") @@ -116,7 +116,7 @@ func TestBasic(t *testing.T) { Key: "a.b", Value: "bbb", }) - value, err = mgr.GetConfig("a.b") + _, value, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "aaa") @@ -149,7 +149,7 @@ func TestOnEvent(t *testing.T) { })) os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) assert.Eventually(t, func() bool { - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) return value == "aaa" }, time.Second*5, time.Second) @@ -158,33 +158,62 @@ func TestOnEvent(t *testing.T) { client.KV.Put(ctx, "test/config/a/b", "bbb") assert.Eventually(t, func() bool { - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) return value == "bbb" }, time.Second*5, time.Second) client.KV.Put(ctx, "test/config/a/b", "ccc") assert.Eventually(t, func() bool { - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) return value == "ccc" }, time.Second*5, time.Second) os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) assert.Eventually(t, func() bool { - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) return value == "ccc" }, time.Second*5, time.Second) client.KV.Delete(ctx, "test/config/a/b") assert.Eventually(t, func() bool { - value, err := mgr.GetConfig("a.b") + _, value, err := mgr.GetConfig("a.b") assert.NoError(t, err) return value == "ddd" }, time.Second*5, time.Second) } +func TestGetConfigAndSource(t *testing.T) { + mgr, _ := Init() + envSource := NewEnvSource(formatKey) + err := mgr.AddSource(envSource) + assert.NoError(t, err) + + envSource.configs.Insert("ab-key", "ab-value") + mgr.OnEvent(&Event{ + EventSource: envSource.GetSourceName(), + EventType: CreateType, + Key: "ab-key", + }) + + mgr.SetConfig("ac-key", "ac-value") + _, value, err := mgr.GetConfig("ac-key") + assert.NoError(t, err) + assert.Equal(t, value, "ac-value") + + // test get all configs + configs := mgr.GetConfigsView() + v, ok := configs["ab-key"] + assert.True(t, ok) + assert.Contains(t, v, "EnvironmentSource") + + v, ok = configs["ac-key"] + assert.True(t, ok) + assert.Contains(t, v, RuntimeSource) +} + func TestDeadlock(t *testing.T) { mgr, _ := Init() diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index cded42ef32..a1f6099fc0 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -317,6 +317,7 @@ type IndexTaskStats struct { IndexVersion int64 `json:"index_version,omitempty,string"` CreatedUTCTime string `json:"create_time,omitempty"` FinishedUTCTime string `json:"finished_time,omitempty"` + NodeID int64 `json:"node_id,omitempty,string"` } type SyncTask struct { @@ -394,6 +395,7 @@ type CompactionTask struct { TotalRows int64 `json:"total_rows,omitempty,string"` InputSegments []string `json:"input_segments,omitempty"` ResultSegments []string `json:"result_segments,omitempty"` + NodeID int64 `json:"node_id,omitempty,string"` } // RootCoordConfiguration records the configuration of RootCoord. diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 3d3bbd38b4..5fe618fd25 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" - config "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -252,7 +252,8 @@ func (bt *BaseTable) UpdateSourceOptions(opts ...config.Option) { // Load loads an object with @key. func (bt *BaseTable) Load(key string) (string, error) { - return bt.mgr.GetConfig(key) + _, v, err := bt.mgr.GetConfig(key) + return v, err } func (bt *BaseTable) Get(key string) string { @@ -265,7 +266,7 @@ func (bt *BaseTable) GetWithDefault(key, defaultValue string) string { return defaultValue } - str, err := bt.mgr.GetConfig(key) + _, str, err := bt.mgr.GetConfig(key) if err != nil { return defaultValue } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c62536f88c..7ae77e4741 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -175,6 +175,10 @@ func (p *ComponentParam) GetAll() map[string]string { return p.baseTable.mgr.GetConfigs() } +func (p *ComponentParam) GetConfigsView() map[string]string { + return p.baseTable.mgr.GetConfigsView() +} + func (p *ComponentParam) Watch(key string, watcher config.EventHandler) { p.baseTable.mgr.Dispatcher.Register(key, watcher) } diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index b8718b65ba..8d8f51b03e 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -70,14 +70,14 @@ func (pi *ParamItem) getWithRaw() (result, raw string, err error) { panic(fmt.Sprintf("manager is nil %s", pi.Key)) } // raw value set only once - raw, err = pi.manager.GetConfig(pi.Key) + _, raw, err = pi.manager.GetConfig(pi.Key) if err != nil || raw == pi.DefaultValue { // try fallback if the entry is not exist or default value, // because default value may already defined in milvus.yaml // and we don't want the fallback keys be overridden. for _, key := range pi.FallbackKeys { var fallbackRaw string - fallbackRaw, err = pi.manager.GetConfig(key) + _, fallbackRaw, err = pi.manager.GetConfig(key) if err == nil { raw = fallbackRaw break