From bc8e02df44f28c4b329a5817b46d17d450aa12de Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 28 Feb 2025 15:38:01 +0800 Subject: [PATCH] enhance: always enable streaming service (#40253) issue: #38399 Signed-off-by: chyezh --- cmd/main.go | 5 +++++ cmd/milvus/util.go | 5 ----- internal/util/streamingutil/env.go | 8 ++++++++ internal/util/streamingutil/test_env.go | 8 -------- scripts/start_cluster.sh | 3 +++ 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index e5a9199f42..992d99e737 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -28,10 +28,15 @@ import ( "github.com/milvus-io/milvus/cmd/asan" "github.com/milvus-io/milvus/cmd/milvus" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) func main() { + // after 2.6.0, we enable streaming service by default. + // TODO: after remove all streamingutil.IsStreamingServiceEnabled(), we can remove this code. + streamingutil.SetStreamingServiceEnabled() + defer asan.LsanDoLeakCheck() idx := slices.Index(os.Args, "--run-with-subprocess") diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index 052f750226..7393638d6a 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -149,7 +149,6 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { case typeutil.IndexNodeRole: role.EnableIndexNode = true case typeutil.StreamingNodeRole: - streamingutil.MustEnableStreamingService() streamingutil.EnableEmbededQueryNode() role.EnableStreamingNode = true role.EnableQueryNode = true @@ -206,10 +205,6 @@ func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCo flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node") flags.BoolVar(&enableStreamingNode, typeutil.StreamingNodeRole, false, "enable streaming node") - if enableStreamingNode { - streamingutil.MustEnableStreamingService() - } - serverType := args[2] if serverType == typeutil.EmbeddedRole { flags.SetOutput(io.Discard) diff --git a/internal/util/streamingutil/env.go b/internal/util/streamingutil/env.go index 6a160f42d0..101fe1d53c 100644 --- a/internal/util/streamingutil/env.go +++ b/internal/util/streamingutil/env.go @@ -14,6 +14,14 @@ func IsStreamingServiceEnabled() bool { return os.Getenv(MilvusStreamingServiceEnabled) == "1" } +// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled. +func SetStreamingServiceEnabled() { + err := os.Setenv(MilvusStreamingServiceEnabled, "1") + if err != nil { + panic(err) + } +} + // MustEnableStreamingService panics if the streaming service is not enabled. func MustEnableStreamingService() { if !IsStreamingServiceEnabled() { diff --git a/internal/util/streamingutil/test_env.go b/internal/util/streamingutil/test_env.go index d0c5a5237e..82048ba811 100644 --- a/internal/util/streamingutil/test_env.go +++ b/internal/util/streamingutil/test_env.go @@ -5,14 +5,6 @@ package streamingutil import "os" -// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled. -func SetStreamingServiceEnabled() { - err := os.Setenv(MilvusStreamingServiceEnabled, "1") - if err != nil { - panic(err) - } -} - // UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled. func UnsetStreamingServiceEnabled() { err := os.Setenv(MilvusStreamingServiceEnabled, "0") diff --git a/scripts/start_cluster.sh b/scripts/start_cluster.sh index b40ceacf7c..cbad892960 100755 --- a/scripts/start_cluster.sh +++ b/scripts/start_cluster.sh @@ -50,3 +50,6 @@ nohup ./bin/milvus run indexcoord --run-with-subprocess > /tmp/indexcoord.log echo "Starting indexnode..." nohup ./bin/milvus run indexnode --run-with-subprocess > /tmp/indexnode.log 2>&1 & + +echo "Starting streamingnode..." +nohup ./bin/milvus run streamingnode --run-with-subprocess > /tmp/streamingnode.log 2>&1 &