enhance: always enable streaming service (#40253)

issue: #38399

Signed-off-by: chyezh <chyezh@outlook.com>
pull/40271/head
Zhen Ye 2025-02-28 15:38:01 +08:00 committed by GitHub
parent 0a4e7b5116
commit bc8e02df44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 16 additions and 13 deletions

View File

@ -28,10 +28,15 @@ import (
"github.com/milvus-io/milvus/cmd/asan"
"github.com/milvus-io/milvus/cmd/milvus"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func main() {
// after 2.6.0, we enable streaming service by default.
// TODO: after remove all streamingutil.IsStreamingServiceEnabled(), we can remove this code.
streamingutil.SetStreamingServiceEnabled()
defer asan.LsanDoLeakCheck()
idx := slices.Index(os.Args, "--run-with-subprocess")

View File

@ -149,7 +149,6 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StreamingNodeRole:
streamingutil.MustEnableStreamingService()
streamingutil.EnableEmbededQueryNode()
role.EnableStreamingNode = true
role.EnableQueryNode = true
@ -206,10 +205,6 @@ func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCo
flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node")
flags.BoolVar(&enableStreamingNode, typeutil.StreamingNodeRole, false, "enable streaming node")
if enableStreamingNode {
streamingutil.MustEnableStreamingService()
}
serverType := args[2]
if serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)

View File

@ -14,6 +14,14 @@ func IsStreamingServiceEnabled() bool {
return os.Getenv(MilvusStreamingServiceEnabled) == "1"
}
// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}
// MustEnableStreamingService panics if the streaming service is not enabled.
func MustEnableStreamingService() {
if !IsStreamingServiceEnabled() {

View File

@ -5,14 +5,6 @@ package streamingutil
import "os"
// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}
// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled.
func UnsetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "0")

View File

@ -50,3 +50,6 @@ nohup ./bin/milvus run indexcoord --run-with-subprocess > /tmp/indexcoord.log
echo "Starting indexnode..."
nohup ./bin/milvus run indexnode --run-with-subprocess > /tmp/indexnode.log 2>&1 &
echo "Starting streamingnode..."
nohup ./bin/milvus run streamingnode --run-with-subprocess > /tmp/streamingnode.log 2>&1 &