diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 5f355436fc..02543e6ea8 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -23,7 +23,6 @@ import ( "os/signal" "path/filepath" "runtime/debug" - "strings" "sync" "syscall" "time" @@ -173,13 +172,6 @@ func NewMilvusRoles() *MilvusRoles { return mr } -// EnvValue not used now. -func (mr *MilvusRoles) EnvValue(env string) bool { - env = strings.ToLower(env) - env = strings.Trim(env, " ") - return env == "1" || env == "true" -} - func (mr *MilvusRoles) printLDPreLoad() { const LDPreLoad = "LD_PRELOAD" val, ok := os.LookupEnv(LDPreLoad) @@ -449,45 +441,55 @@ func (mr *MilvusRoles) Run() { if mr.EnableRootCoord { rootCoord = mr.runRootCoord(ctx, local, &wg) componentMap[typeutil.RootCoordRole] = rootCoord + paramtable.SetLocalComponentEnabled(typeutil.RootCoordRole) } if mr.EnableDataCoord { dataCoord = mr.runDataCoord(ctx, local, &wg) componentMap[typeutil.DataCoordRole] = dataCoord + paramtable.SetLocalComponentEnabled(typeutil.DataCoordRole) } if mr.EnableIndexCoord { indexCoord = mr.runIndexCoord(ctx, local, &wg) componentMap[typeutil.IndexCoordRole] = indexCoord + paramtable.SetLocalComponentEnabled(typeutil.IndexCoordRole) } if mr.EnableQueryCoord { queryCoord = mr.runQueryCoord(ctx, local, &wg) componentMap[typeutil.QueryCoordRole] = queryCoord + paramtable.SetLocalComponentEnabled(typeutil.QueryCoordRole) } if mr.EnableQueryNode { queryNode = mr.runQueryNode(ctx, local, &wg) componentMap[typeutil.QueryNodeRole] = queryNode + paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole) } if mr.EnableDataNode { dataNode = mr.runDataNode(ctx, local, &wg) componentMap[typeutil.DataNodeRole] = dataNode + paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole) } if mr.EnableIndexNode { indexNode = mr.runIndexNode(ctx, local, &wg) componentMap[typeutil.IndexNodeRole] = indexNode + paramtable.SetLocalComponentEnabled(typeutil.IndexNodeRole) } if mr.EnableProxy { proxy = mr.runProxy(ctx, local, &wg) componentMap[typeutil.ProxyRole] = proxy + paramtable.SetLocalComponentEnabled(typeutil.ProxyRole) } if mr.EnableStreamingNode { + // Before initializing the local streaming node, make sure the local registry is ready. streamingNode = mr.runStreamingNode(ctx, local, &wg) componentMap[typeutil.StreamingNodeRole] = streamingNode + paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole) } wg.Wait() diff --git a/cmd/roles/roles_test.go b/cmd/roles/roles_test.go index 69bbd18b46..f577639033 100644 --- a/cmd/roles/roles_test.go +++ b/cmd/roles/roles_test.go @@ -28,19 +28,6 @@ import ( ) func TestRoles(t *testing.T) { - r := MilvusRoles{} - - assert.True(t, r.EnvValue("1")) - assert.True(t, r.EnvValue(" 1 ")) - assert.True(t, r.EnvValue("True")) - assert.True(t, r.EnvValue(" True ")) - assert.True(t, r.EnvValue(" TRue ")) - assert.False(t, r.EnvValue("0")) - assert.False(t, r.EnvValue(" 0 ")) - assert.False(t, r.EnvValue(" false ")) - assert.False(t, r.EnvValue(" False ")) - assert.False(t, r.EnvValue(" abc ")) - ss := strings.SplitN("abcdef", "=", 2) assert.Equal(t, len(ss), 1) ss = strings.SplitN("adb=def", "=", 2) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3f87234823..c76ef7f233 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -26,6 +26,7 @@ import ( "time" "github.com/shirou/gopsutil/v3/disk" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/config" @@ -104,8 +105,6 @@ type ComponentParam struct { StreamingCoordGrpcClientCfg GrpcClientConfig StreamingNodeGrpcClientCfg GrpcClientConfig IntegrationTestCfg integrationTestConfig - - RuntimeConfig runtimeConfig } // Init initialize once @@ -4850,11 +4849,13 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura p.TxnDefaultKeepaliveTimeout.Init(base.mgr) } +// runtimeConfig is just a private environment value table. type runtimeConfig struct { - CreateTime RuntimeParamItem - UpdateTime RuntimeParamItem - Role RuntimeParamItem - NodeID RuntimeParamItem + createTime time.Time + updateTime time.Time + role string + nodeID atomic.Int64 + components map[string]struct{} } type integrationTestConfig struct { diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index b8718b65ba..93be87ce7c 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -396,39 +396,3 @@ func getAndConvert[T any](v string, converter func(input string) (T, error), def } return t } - -type RuntimeParamItem struct { - value atomic.Value -} - -func (rpi *RuntimeParamItem) GetValue() any { - return rpi.value.Load() -} - -func (rpi *RuntimeParamItem) GetAsString() string { - value, ok := rpi.value.Load().(string) - if !ok { - return "" - } - return value -} - -func (rpi *RuntimeParamItem) GetAsTime() time.Time { - value, ok := rpi.value.Load().(time.Time) - if !ok { - return time.Time{} - } - return value -} - -func (rpi *RuntimeParamItem) GetAsInt64() int64 { - value, ok := rpi.value.Load().(int64) - if !ok { - return 0 - } - return value -} - -func (rpi *RuntimeParamItem) SetValue(value any) { - rpi.value.Store(value) -} diff --git a/pkg/util/paramtable/runtime.go b/pkg/util/paramtable/runtime.go index 7d9b67aed0..a961c5be63 100644 --- a/pkg/util/paramtable/runtime.go +++ b/pkg/util/paramtable/runtime.go @@ -23,8 +23,11 @@ import ( ) var ( - once sync.Once - params ComponentParam + once sync.Once + params ComponentParam + runtimeParam = runtimeConfig{ + components: make(map[string]struct{}, 0), + } hookParams hookConfig ) @@ -58,11 +61,11 @@ func GetHookParams() *hookConfig { } func SetNodeID(newID UniqueID) { - params.RuntimeConfig.NodeID.SetValue(newID) + runtimeParam.nodeID.Store(newID) } func GetNodeID() UniqueID { - return params.RuntimeConfig.NodeID.GetAsInt64() + return runtimeParam.nodeID.Load() } func GetStringNodeID() string { @@ -70,25 +73,34 @@ func GetStringNodeID() string { } func SetRole(role string) { - params.RuntimeConfig.Role.SetValue(role) + runtimeParam.role = role } func GetRole() string { - return params.RuntimeConfig.Role.GetAsString() + return runtimeParam.role } func SetCreateTime(d time.Time) { - params.RuntimeConfig.CreateTime.SetValue(d) + runtimeParam.createTime = d } func GetCreateTime() time.Time { - return params.RuntimeConfig.CreateTime.GetAsTime() + return runtimeParam.createTime } func SetUpdateTime(d time.Time) { - params.RuntimeConfig.UpdateTime.SetValue(d) + runtimeParam.updateTime = d } func GetUpdateTime() time.Time { - return params.RuntimeConfig.UpdateTime.GetAsTime() + return runtimeParam.updateTime +} + +func SetLocalComponentEnabled(component string) { + runtimeParam.components[component] = struct{}{} +} + +func IsLocalComponentEnabled(component string) bool { + _, ok := runtimeParam.components[component] + return ok } diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 404ef45e6d..e8f8584e48 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestServiceParam(t *testing.T) { @@ -221,3 +222,14 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, 10000, Params.PaginationSize.GetAsInt()) }) } + +func TestRuntimConfig(t *testing.T) { + SetRole(typeutil.StandaloneRole) + assert.Equal(t, GetRole(), typeutil.StandaloneRole) + + SetLocalComponentEnabled(typeutil.QueryNodeRole) + assert.True(t, IsLocalComponentEnabled(typeutil.QueryNodeRole)) + + SetLocalComponentEnabled(typeutil.QueryCoordRole) + assert.True(t, IsLocalComponentEnabled(typeutil.QueryCoordRole)) +}