From bef06e5acfe144d19cf56ee62e41bf1907c3d401 Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 25 Jul 2024 12:19:44 +0800 Subject: [PATCH] enhance: add streaming coord and node grpc service register (#34655) issue: #33285 - register streaming coord service into datacoord. - add new streaming node role. - add global static switch to enable streaming service or not. Signed-off-by: chyezh --- cmd/components/streaming_node.go | 44 +++ cmd/milvus/util.go | 16 +- cmd/roles/roles.go | 31 +- internal/datacoord/server.go | 38 +- internal/distributed/datacoord/service.go | 18 +- internal/distributed/streamingnode/service.go | 345 ++++++++++++++++++ internal/mocks/mock_datacoord.go | 35 ++ internal/streamingcoord/server/builder.go | 48 +++ .../server/resource/resource.go | 4 +- .../server/resource/resource_test.go | 6 - internal/streamingcoord/server/server.go | 82 +++++ internal/streamingcoord/server/server_test.go | 42 +++ internal/streamingnode/server/builder.go | 71 ++++ .../streamingnode/server/resource/resource.go | 1 + .../server/resource/resource_test.go | 6 +- internal/streamingnode/server/server.go | 90 +++++ internal/types/types.go | 3 + .../util/componentutil/component_service.go | 89 +++++ .../componentutil/component_service_test.go | 46 +++ internal/util/streamingutil/checker.go | 16 + pkg/util/paramtable/component_param.go | 21 +- 21 files changed, 1017 insertions(+), 35 deletions(-) create mode 100644 cmd/components/streaming_node.go create mode 100644 internal/distributed/streamingnode/service.go create mode 100644 internal/streamingcoord/server/builder.go create mode 100644 internal/streamingcoord/server/server.go create mode 100644 internal/streamingcoord/server/server_test.go create mode 100644 internal/streamingnode/server/builder.go create mode 100644 internal/streamingnode/server/server.go create mode 100644 internal/util/componentutil/component_service.go create mode 100644 internal/util/componentutil/component_service_test.go create mode 100644 internal/util/streamingutil/checker.go diff --git a/cmd/components/streaming_node.go b/cmd/components/streaming_node.go new file mode 100644 index 0000000000..2dad02154f --- /dev/null +++ b/cmd/components/streaming_node.go @@ -0,0 +1,44 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "context" + + "github.com/milvus-io/milvus/internal/distributed/streamingnode" + "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type StreamingNode struct { + *streamingnode.Server +} + +// NewStreamingNode creates a new StreamingNode +func NewStreamingNode(_ context.Context, _ dependency.Factory) (*StreamingNode, error) { + svr, err := streamingnode.NewServer() + if err != nil { + return nil, err + } + return &StreamingNode{ + Server: svr, + }, nil +} + +func (q *StreamingNode) GetName() string { + return typeutil.StreamingNodeRole +} diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index 35068a6d32..e7dcb20353 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus/cmd/roles" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -123,7 +124,7 @@ func removePidFile(lock *flock.Flock) { func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { alias, enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord, enableQueryNode, - enableDataNode, enableIndexNode, enableProxy := formatFlags(args, flags) + enableDataNode, enableIndexNode, enableProxy, enableStreamingNode := formatFlags(args, flags) serverType := args[2] role := roles.NewMilvusRoles() @@ -147,6 +148,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableIndexCoord = true case typeutil.IndexNodeRole: role.EnableIndexNode = true + case typeutil.StreamingNodeRole: + streamingutil.MustEnableStreamingService() + role.EnableStreamingNode = true case typeutil.StandaloneRole, typeutil.EmbeddedRole: role.EnableRootCoord = true role.EnableProxy = true @@ -156,6 +160,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableDataNode = true role.EnableIndexCoord = true role.EnableIndexNode = true + if streamingutil.IsStreamingServiceEnabled() { + role.EnableStreamingNode = true + } role.Local = true role.Embedded = serverType == typeutil.EmbeddedRole case typeutil.MixtureRole: @@ -167,6 +174,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableDataNode = enableDataNode role.EnableIndexNode = enableIndexNode role.EnableProxy = enableProxy + role.EnableStreamingNode = enableStreamingNode default: fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) @@ -177,6 +185,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord, enableQueryNode, enableDataNode, enableIndexNode, enableProxy bool, + enableStreamingNode bool, ) { flags.StringVar(&alias, "alias", "", "set alias") @@ -189,6 +198,11 @@ func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCo flags.BoolVar(&enableDataNode, typeutil.DataNodeRole, false, "enable data node") flags.BoolVar(&enableIndexNode, typeutil.IndexNodeRole, false, "enable index node") flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node") + flags.BoolVar(&enableStreamingNode, typeutil.StreamingNodeRole, false, "enable streaming node") + + if enableStreamingNode { + streamingutil.MustEnableStreamingService() + } serverType := args[2] if serverType == typeutil.EmbeddedRole { diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index c32d604c95..be698a6567 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -130,14 +130,15 @@ func runComponent[T component](ctx context.Context, // MilvusRoles decides which components are brought up with Milvus. type MilvusRoles struct { - EnableRootCoord bool `env:"ENABLE_ROOT_COORD"` - EnableProxy bool `env:"ENABLE_PROXY"` - EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"` - EnableQueryNode bool `env:"ENABLE_QUERY_NODE"` - EnableDataCoord bool `env:"ENABLE_DATA_COORD"` - EnableDataNode bool `env:"ENABLE_DATA_NODE"` - EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"` - EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` + EnableRootCoord bool `env:"ENABLE_ROOT_COORD"` + EnableProxy bool `env:"ENABLE_PROXY"` + EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"` + EnableQueryNode bool `env:"ENABLE_QUERY_NODE"` + EnableDataCoord bool `env:"ENABLE_DATA_COORD"` + EnableDataNode bool `env:"ENABLE_DATA_NODE"` + EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"` + EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` + EnableStreamingNode bool `env:"ENABLE_STREAMING_NODE"` Local bool Alias string @@ -207,6 +208,11 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord) } +func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { + wg.Add(1) + return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode) +} + func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) @@ -374,7 +380,7 @@ func (mr *MilvusRoles) Run() { componentMap := make(map[string]component) var rootCoord, queryCoord, indexCoord, dataCoord component - var proxy, dataNode, indexNode, queryNode component + var proxy, dataNode, indexNode, queryNode, streamingNode component if mr.EnableRootCoord { rootCoord = mr.runRootCoord(ctx, local, &wg) componentMap[typeutil.RootCoordRole] = rootCoord @@ -414,6 +420,11 @@ func (mr *MilvusRoles) Run() { componentMap[typeutil.ProxyRole] = proxy } + if mr.EnableStreamingNode { + streamingNode = mr.runStreamingNode(ctx, local, &wg) + componentMap[typeutil.StreamingNodeRole] = streamingNode + } + wg.Wait() http.RegisterStopComponent(func(role string) error { @@ -480,7 +491,7 @@ func (mr *MilvusRoles) Run() { log.Info("All coordinators have stopped") // stop nodes - nodes := []component{queryNode, indexNode, dataNode} + nodes := []component{queryNode, indexNode, dataNode, streamingNode} for idx, node := range nodes { if node != nil { log.Info("stop node", zap.Int("idx", idx), zap.Any("node", node)) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 6f7f16bbe4..f461c4c009 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/broker" @@ -43,9 +44,11 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" + streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -155,6 +158,9 @@ type Server struct { // manage ways that data coord access other coord broker broker.Broker + + // streamingcoord server is embedding in datacoord now. + streamingCoord *streamingcoord.Server } type CollectionNameInfo struct { @@ -303,6 +309,11 @@ func (s *Server) Init() error { } s.startDataCoord() log.Info("DataCoord startup success") + + if s.streamingCoord != nil { + s.streamingCoord.Start() + log.Info("StreamingCoord stratup successfully at standby mode") + } return nil } s.stateCode.Store(commonpb.StateCode_StandBy) @@ -313,6 +324,10 @@ func (s *Server) Init() error { return s.initDataCoord() } +func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) { + s.streamingCoord.RegisterGRPCService(server) +} + func (s *Server) initDataCoord() error { s.stateCode.Store(commonpb.StateCode_Initializing) var err error @@ -334,6 +349,18 @@ func (s *Server) initDataCoord() error { return err } + // Initialize streaming coordinator. + if streamingutil.IsStreamingServiceEnabled() { + s.streamingCoord = streamingcoord.NewServerBuilder(). + WithETCD(s.etcdCli). + WithMetaKV(s.kv). + WithSession(s.session).Build() + if err = s.streamingCoord.Init(context.TODO()); err != nil { + return err + } + log.Info("init streaming coordinator done") + } + s.handler = newServerHandler(s) // check whether old node exist, if yes suspend auto balance until all old nodes down @@ -390,6 +417,10 @@ func (s *Server) Start() error { if !s.enableActiveStandBy { s.startDataCoord() log.Info("DataCoord startup successfully") + if s.streamingCoord != nil { + s.streamingCoord.Start() + log.Info("StreamingCoord stratup successfully") + } } return nil @@ -1009,6 +1040,12 @@ func (s *Server) Stop() error { s.garbageCollector.close() logutil.Logger(s.ctx).Info("datacoord garbage collector stopped") + if s.streamingCoord != nil { + log.Info("StreamingCoord stoping...") + s.streamingCoord.Stop() + log.Info("StreamingCoord stopped") + } + s.stopServerLoop() s.importScheduler.Close() @@ -1035,7 +1072,6 @@ func (s *Server) Stop() error { s.stopServerLoop() logutil.Logger(s.ctx).Info("datacoord serverloop stopped") logutil.Logger(s.ctx).Warn("datacoord stop successful") - return nil } diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 0b8aedce31..b59453eb0d 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -43,6 +43,8 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/streamingutil" + streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" @@ -120,14 +122,14 @@ func (s *Server) init() error { log.Info("Connected to tikv. Using tikv as metadata storage.") } - err = s.startGrpc() - if err != nil { - log.Debug("DataCoord startGrpc failed", zap.Error(err)) + if err := s.dataCoord.Init(); err != nil { + log.Error("dataCoord init error", zap.Error(err)) return err } - if err := s.dataCoord.Init(); err != nil { - log.Error("dataCoord init error", zap.Error(err)) + err = s.startGrpc() + if err != nil { + log.Debug("DataCoord startGrpc failed", zap.Error(err)) return err } return nil @@ -184,6 +186,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( otelgrpc.StreamServerInterceptor(opts...), @@ -195,9 +198,14 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), ))) indexpb.RegisterIndexCoordServer(s.grpcServer, s) datapb.RegisterDataCoordServer(s.grpcServer, s) + // register the streaming coord grpc service. + if streamingutil.IsStreamingServiceEnabled() { + s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer) + } go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(lis); err != nil { s.grpcErrChan <- err diff --git a/internal/distributed/streamingnode/service.go b/internal/distributed/streamingnode/service.go new file mode 100644 index 0000000000..101af807f9 --- /dev/null +++ b/internal/distributed/streamingnode/service.go @@ -0,0 +1,345 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streamingnode + +import ( + "context" + "fmt" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/cockroachdb/errors" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + clientv3 "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" + rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" + streamingnodeserver "github.com/milvus-io/milvus/internal/streamingnode/server" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + "github.com/milvus-io/milvus/internal/util/sessionutil" + streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/interceptor" + "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// Server is the grpc server of streamingnode. +type Server struct { + stopOnce sync.Once + grpcServerChan chan struct{} + + // session of current server. + session *sessionutil.Session + + // server + streamingnode *streamingnodeserver.Server + + // rpc + grpcServer *grpc.Server + lis net.Listener + + // component client + etcdCli *clientv3.Client + rootCoord types.RootCoordClient + dataCoord types.DataCoordClient +} + +// NewServer create a new StreamingNode server. +func NewServer() (*Server, error) { + return &Server{ + stopOnce: sync.Once{}, + grpcServerChan: make(chan struct{}), + }, nil +} + +// Run runs the server. +func (s *Server) Run() error { + if err := s.init(); err != nil { + return err + } + log.Info("streamingnode init done ...") + + if err := s.start(); err != nil { + return err + } + log.Info("streamingnode start done ...") + return nil +} + +// Stop stops the server, should be call after Run returned. +func (s *Server) Stop() (err error) { + s.stopOnce.Do(s.stop) + return nil +} + +// stop stops the server. +func (s *Server) stop() { + addr, _ := s.getAddress() + log.Info("streamingnode stop", zap.String("Address", addr)) + + // Unregister current server from etcd. + log.Info("streamingnode unregister session from etcd...") + if err := s.session.GoingStop(); err != nil { + log.Warn("streamingnode unregister session failed", zap.Error(err)) + } + + // Stop grpc server. + log.Info("streamingnode stop grpc server...") + s.grpcServer.GracefulStop() + + // Stop StreamingNode service. + log.Info("streamingnode stop service...") + s.streamingnode.Stop() + + // Stop all session + log.Info("streamingnode stop session...") + s.session.Stop() + + // Stop rootCoord client. + log.Info("streamingnode stop rootCoord client...") + if err := s.rootCoord.Close(); err != nil { + log.Warn("streamingnode stop rootCoord client failed", zap.Error(err)) + } + + // Wait for grpc server to stop. + log.Info("wait for grpc server stop...") + <-s.grpcServerChan + log.Info("streamingnode stop done") +} + +// Health check the health status of streamingnode. +func (s *Server) Health(ctx context.Context) commonpb.StateCode { + return s.streamingnode.Health(ctx) +} + +func (s *Server) init() (err error) { + defer func() { + if err != nil { + log.Error("StreamingNode init failed", zap.Error(err)) + return + } + log.Info("init StreamingNode server finished") + }() + + // Create etcd client. + s.etcdCli, _ = kvfactory.GetEtcdAndPath() + + if err := s.allocateAddress(); err != nil { + return err + } + if err := s.initSession(); err != nil { + return err + } + if err := s.initRootCoord(); err != nil { + return err + } + if err := s.initDataCoord(); err != nil { + return err + } + s.initGRPCServer() + + // Create StreamingNode service. + s.streamingnode = streamingnodeserver.NewServerBuilder(). + WithETCD(s.etcdCli). + WithGRPCServer(s.grpcServer). + WithRootCoordClient(s.rootCoord). + WithDataCoordClient(s.dataCoord). + WithSession(s.session). + Build() + if err := s.streamingnode.Init(context.Background()); err != nil { + return errors.Wrap(err, "StreamingNode service init failed") + } + return nil +} + +func (s *Server) start() (err error) { + defer func() { + if err != nil { + log.Error("StreamingNode start failed", zap.Error(err)) + return + } + log.Info("start StreamingNode server finished") + }() + + // Start StreamingNode service. + s.streamingnode.Start() + + // Start grpc server. + if err := s.startGPRCServer(); err != nil { + return errors.Wrap(err, "StreamingNode start gRPC server fail") + } + + // Register current server to etcd. + s.registerSessionToETCD() + return nil +} + +func (s *Server) initSession() error { + s.session = sessionutil.NewSession(context.Background()) + if s.session == nil { + return errors.New("session is nil, the etcd client connection may have failed") + } + addr, err := s.getAddress() + if err != nil { + return err + } + s.session.Init(typeutil.StreamingNodeRole, addr, false, true) + paramtable.SetNodeID(s.session.ServerID) + log.Info("StreamingNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", addr)) + return nil +} + +func (s *Server) initRootCoord() (err error) { + log.Info("StreamingNode connect to rootCoord...") + s.rootCoord, err = rcc.NewClient(context.Background()) + if err != nil { + return errors.Wrap(err, "StreamingNode try to new RootCoord client failed") + } + + log.Info("StreamingNode try to wait for RootCoord ready") + err = componentutil.WaitForComponentHealthy(context.Background(), s.rootCoord, "RootCoord", 1000000, time.Millisecond*200) + if err != nil { + return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed") + } + return nil +} + +func (s *Server) initDataCoord() (err error) { + log.Info("StreamingNode connect to dataCoord...") + s.dataCoord, err = dcc.NewClient(context.Background()) + if err != nil { + return errors.Wrap(err, "StreamingNode try to new DataCoord client failed") + } + + log.Info("StreamingNode try to wait for DataCoord ready") + err = componentutil.WaitForComponentHealthy(context.Background(), s.dataCoord, "DataCoord", 1000000, time.Millisecond*200) + if err != nil { + return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed") + } + return nil +} + +func (s *Server) initGRPCServer() { + log.Info("create StreamingNode server...") + cfg := ¶mtable.Get().StreamingNodeGrpcServerCfg + kaep := keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection + PermitWithoutStream: true, // Allow pings even when there are no active streams + } + kasp := keepalive.ServerParameters{ + Time: 60 * time.Second, // Ping the client if it is idle for 60 seconds to ensure the connection is still active + Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead + } + + serverIDGetter := func() int64 { + return s.session.ServerID + } + opts := tracer.GetInterceptorOpts() + s.grpcServer = grpc.NewServer( + grpc.KeepaliveEnforcementPolicy(kaep), + grpc.KeepaliveParams(kasp), + grpc.MaxRecvMsgSize(cfg.ServerMaxRecvSize.GetAsInt()), + grpc.MaxSendMsgSize(cfg.ServerMaxSendSize.GetAsInt()), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + otelgrpc.UnaryServerInterceptor(opts...), + logutil.UnaryTraceLoggerInterceptor, + interceptor.ClusterValidationUnaryServerInterceptor(), + interceptor.ServerIDValidationUnaryServerInterceptor(serverIDGetter), + streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), + )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + otelgrpc.StreamServerInterceptor(opts...), + logutil.StreamTraceLoggerInterceptor, + interceptor.ClusterValidationStreamServerInterceptor(), + interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter), + streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), + ))) +} + +// allocateAddress allocates a available address for streamingnode grpc server. +func (s *Server) allocateAddress() (err error) { + port := paramtable.Get().StreamingNodeGrpcServerCfg.Port.GetAsInt() + + retry.Do(context.Background(), func() error { + addr := ":" + strconv.Itoa(port) + s.lis, err = net.Listen("tcp", addr) + if err != nil { + if port != 0 { + // set port=0 to get next available port by os + log.Warn("StreamingNode suggested port is in used, try to get by os", zap.Error(err)) + port = 0 + } + } + return err + }, retry.Attempts(10)) + return err +} + +// getAddress returns the address of streamingnode grpc server. +// must be called after allocateAddress. +func (s *Server) getAddress() (string, error) { + if s.lis == nil { + return "", errors.New("StreamingNode grpc server is not initialized") + } + ip := paramtable.Get().StreamingNodeGrpcServerCfg.IP + return fmt.Sprintf("%s:%d", ip, s.lis.Addr().(*net.TCPAddr).Port), nil +} + +// startGRPCServer starts the grpc server. +func (s *Server) startGPRCServer() error { + errCh := make(chan error, 1) + go func() { + defer close(s.grpcServerChan) + + if err := s.grpcServer.Serve(s.lis); err != nil { + select { + case errCh <- err: + // failure at initial startup. + default: + // failure at runtime. + panic(errors.Wrapf(err, "grpc server stop with unexpected error")) + } + } + }() + funcutil.CheckGrpcReady(context.Background(), errCh) + return <-errCh +} + +// registerSessionToETCD registers current server to etcd. +func (s *Server) registerSessionToETCD() { + s.session.Register() + // start liveness check + s.session.LivenessCheck(context.Background(), func() { + log.Error("StreamingNode disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID())) + os.Exit(1) + }) +} diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index 67e01b55e9..5de170cfab 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -10,6 +10,8 @@ import ( datapb "github.com/milvus-io/milvus/internal/proto/datapb" + grpc "google.golang.org/grpc" + indexpb "github.com/milvus-io/milvus/internal/proto/indexpb" internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -2318,6 +2320,39 @@ func (_c *MockDataCoord_Register_Call) RunAndReturn(run func() error) *MockDataC return _c } +// RegisterStreamingCoordGRPCService provides a mock function with given fields: s +func (_m *MockDataCoord) RegisterStreamingCoordGRPCService(s *grpc.Server) { + _m.Called(s) +} + +// MockDataCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService' +type MockDataCoord_RegisterStreamingCoordGRPCService_Call struct { + *mock.Call +} + +// RegisterStreamingCoordGRPCService is a helper method to define mock.On call +// - s *grpc.Server +func (_e *MockDataCoord_Expecter) RegisterStreamingCoordGRPCService(s interface{}) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { + return &MockDataCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", s)} +} + +func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(s *grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*grpc.Server)) + }) + return _c +} + +func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Return() *MockDataCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return() + return _c +} + +func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return(run) + return _c +} + // ReportDataNodeTtMsgs provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) ReportDataNodeTtMsgs(_a0 context.Context, _a1 *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/streamingcoord/server/builder.go b/internal/streamingcoord/server/builder.go new file mode 100644 index 0000000000..b39908bf35 --- /dev/null +++ b/internal/streamingcoord/server/builder.go @@ -0,0 +1,48 @@ +package server + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord" + "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" + "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type ServerBuilder struct { + etcdClient *clientv3.Client + metaKV kv.MetaKv + session sessionutil.SessionInterface +} + +func NewServerBuilder() *ServerBuilder { + return &ServerBuilder{} +} + +func (b *ServerBuilder) WithETCD(e *clientv3.Client) *ServerBuilder { + b.etcdClient = e + return b +} + +func (b *ServerBuilder) WithMetaKV(metaKV kv.MetaKv) *ServerBuilder { + b.metaKV = metaKV + return b +} + +func (b *ServerBuilder) WithSession(session sessionutil.SessionInterface) *ServerBuilder { + b.session = session + return b +} + +func (s *ServerBuilder) Build() *Server { + resource.Init( + resource.OptETCD(s.etcdClient), + resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)), + ) + return &Server{ + session: s.session, + componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole), + } +} diff --git a/internal/streamingcoord/server/resource/resource.go b/internal/streamingcoord/server/resource/resource.go index e6b991edf4..722a0342db 100644 --- a/internal/streamingcoord/server/resource/resource.go +++ b/internal/streamingcoord/server/resource/resource.go @@ -37,8 +37,8 @@ func Init(opts ...optResourceInit) { } assertNotNil(newR.ETCD()) assertNotNil(newR.StreamingCatalog()) - // TODO: after add streaming node manager client, remove this line. - // assertNotNil(r.StreamingNodeManagerClient()) + newR.streamingNodeManagerClient = manager.NewManagerClient(newR.etcdClient) + assertNotNil(newR.StreamingNodeManagerClient()) r = newR } diff --git a/internal/streamingcoord/server/resource/resource_test.go b/internal/streamingcoord/server/resource/resource_test.go index 2174835038..71cc750d95 100644 --- a/internal/streamingcoord/server/resource/resource_test.go +++ b/internal/streamingcoord/server/resource/resource_test.go @@ -21,12 +21,6 @@ func TestInit(t *testing.T) { mock_metastore.NewMockStreamingCoordCataLog(t), )) }) - Init(OptETCD(&clientv3.Client{}), OptStreamingCatalog( - mock_metastore.NewMockStreamingCoordCataLog(t), - )) - - assert.NotNil(t, Resource().StreamingCatalog()) - assert.NotNil(t, Resource().ETCD()) } func TestInitForTest(t *testing.T) { diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go new file mode 100644 index 0000000000..231b9fd18f --- /dev/null +++ b/internal/streamingcoord/server/server.go @@ -0,0 +1,82 @@ +package server + +import ( + "context" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" + _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy + "github.com/milvus-io/milvus/internal/streamingcoord/server/service" + "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/log" +) + +// Server is the streamingcoord server. +type Server struct { + // session of current server. + session sessionutil.SessionInterface + + // service level variables. + assignmentService service.AssignmentService + componentStateService *componentutil.ComponentStateService // state. + + // basic component variables can be used at service level. + balancer balancer.Balancer +} + +// Init initializes the streamingcoord server. +func (s *Server) Init(ctx context.Context) (err error) { + log.Info("init streamingcoord server...") + s.componentStateService.OnInitializing() + + // Init all underlying component of streamingcoord server. + if err := s.initBasicComponent(ctx); err != nil { + log.Error("init basic component of streamingcoord server failed", zap.Error(err)) + return err + } + // Init all grpc service of streamingcoord server. + s.initService() + s.componentStateService.OnInitialized(s.session.GetServerID()) + log.Info("streamingcoord server initialized") + return nil +} + +// initBasicComponent initialize all underlying dependency for streamingcoord. +func (s *Server) initBasicComponent(ctx context.Context) error { + // Init balancer + var err error + // Read new incoming topics from configuration, and register it into balancer. + newIncomingTopics := util.GetAllTopicsFromConfiguration() + s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) + return err +} + +// initService initializes the grpc service. +func (s *Server) initService() { + s.assignmentService = service.NewAssignmentService(s.balancer) +} + +// registerGRPCService register all grpc service to grpc server. +func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { + streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) + streamingpb.RegisterStreamingCoordStateServiceServer(grpcServer, s.componentStateService) +} + +// Start starts the streamingcoord server. +func (s *Server) Start() { + // Just do nothing now. + log.Info("start streamingcoord server") +} + +// Stop stops the streamingcoord server. +func (s *Server) Stop() { + s.componentStateService.OnStopping() + log.Info("close balancer...") + s.balancer.Close() + log.Info("streamingcoord server stopped") +} diff --git a/internal/streamingcoord/server/server_test.go b/internal/streamingcoord/server/server_test.go new file mode 100644 index 0000000000..e14907dacd --- /dev/null +++ b/internal/streamingcoord/server/server_test.go @@ -0,0 +1,42 @@ +package server + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestServer(t *testing.T) { + paramtable.Init() + + params := paramtable.Get() + + endpoints := params.EtcdCfg.Endpoints.GetValue() + etcdEndpoints := strings.Split(endpoints, ",") + c, err := etcd.GetRemoteEtcdClient(etcdEndpoints) + assert.NoError(t, err) + assert.NotNil(t, c) + + b := NewServerBuilder() + metaKV := etcdkv.NewEtcdKV(c, "test") + s := sessionutil.NewMockSession(t) + s.EXPECT().GetServerID().Return(1) + + newServer := b.WithETCD(c). + WithMetaKV(metaKV). + WithSession(s). + Build() + + ctx := context.Background() + err = newServer.Init(ctx) + assert.NoError(t, err) + newServer.Start() + newServer.Stop() +} diff --git a/internal/streamingnode/server/builder.go b/internal/streamingnode/server/builder.go new file mode 100644 index 0000000000..eeb5237fa9 --- /dev/null +++ b/internal/streamingnode/server/builder.go @@ -0,0 +1,71 @@ +package server + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" + + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// ServerBuilder is used to build a server. +// All component should be initialized before server initialization should be added here. +type ServerBuilder struct { + etcdClient *clientv3.Client + grpcServer *grpc.Server + rc types.RootCoordClient + dc types.DataCoordClient + session *sessionutil.Session +} + +// NewServerBuilder creates a new server builder. +func NewServerBuilder() *ServerBuilder { + return &ServerBuilder{} +} + +// WithETCD sets etcd client to the server builder. +func (b *ServerBuilder) WithETCD(e *clientv3.Client) *ServerBuilder { + b.etcdClient = e + return b +} + +// WithGRPCServer sets grpc server to the server builder. +func (b *ServerBuilder) WithGRPCServer(svr *grpc.Server) *ServerBuilder { + b.grpcServer = svr + return b +} + +// WithRootCoordClient sets root coord client to the server builder. +func (b *ServerBuilder) WithRootCoordClient(rc types.RootCoordClient) *ServerBuilder { + b.rc = rc + return b +} + +// WithDataCoordClient sets data coord client to the server builder. +func (b *ServerBuilder) WithDataCoordClient(dc types.DataCoordClient) *ServerBuilder { + b.dc = dc + return b +} + +// WithSession sets session to the server builder. +func (b *ServerBuilder) WithSession(session *sessionutil.Session) *ServerBuilder { + b.session = session + return b +} + +// Build builds a streaming node server. +func (s *ServerBuilder) Build() *Server { + resource.Init( + resource.OptETCD(s.etcdClient), + resource.OptRootCoordClient(s.rc), + resource.OptDataCoordClient(s.dc), + ) + return &Server{ + session: s.session, + grpcServer: s.grpcServer, + componentStateService: componentutil.NewComponentStateService(typeutil.StreamingNodeRole), + } +} diff --git a/internal/streamingnode/server/resource/resource.go b/internal/streamingnode/server/resource/resource.go index ed72fbeaba..5243f78628 100644 --- a/internal/streamingnode/server/resource/resource.go +++ b/internal/streamingnode/server/resource/resource.go @@ -48,6 +48,7 @@ func Init(opts ...optResourceInit) { assertNotNil(r.TSOAllocator()) assertNotNil(r.ETCD()) assertNotNil(r.RootCoordClient()) + assertNotNil(r.DataCoordClient()) } // Resource access the underlying singleton of resources. diff --git a/internal/streamingnode/server/resource/resource_test.go b/internal/streamingnode/server/resource/resource_test.go index 7c84d920de..6aaed02e34 100644 --- a/internal/streamingnode/server/resource/resource_test.go +++ b/internal/streamingnode/server/resource/resource_test.go @@ -19,7 +19,11 @@ func TestInit(t *testing.T) { assert.Panics(t, func() { Init(OptRootCoordClient(mocks.NewMockRootCoordClient(t))) }) - Init(OptETCD(&clientv3.Client{}), OptRootCoordClient(mocks.NewMockRootCoordClient(t))) + Init( + OptETCD(&clientv3.Client{}), + OptRootCoordClient(mocks.NewMockRootCoordClient(t)), + OptDataCoordClient(mocks.NewMockDataCoordClient(t)), + ) assert.NotNil(t, Resource().TSOAllocator()) assert.NotNil(t, Resource().ETCD()) diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go new file mode 100644 index 0000000000..6f1fde93fc --- /dev/null +++ b/internal/streamingnode/server/server.go @@ -0,0 +1,90 @@ +package server + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/service" + "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" + "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" + _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" + _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" +) + +// Server is the streamingnode server. +type Server struct { + // session of current server. + session *sessionutil.Session + grpcServer *grpc.Server + + // service level instances. + handlerService service.HandlerService + managerService service.ManagerService + componentStateService *componentutil.ComponentStateService // state. + + // basic component instances. + walManager walmanager.Manager +} + +// Init initializes the streamingnode server. +func (s *Server) Init(ctx context.Context) (err error) { + log.Info("init streamingnode server...") + s.componentStateService.OnInitializing() + // init all basic components. + s.initBasicComponent(ctx) + + // init all service. + s.initService(ctx) + log.Info("streamingnode server initialized") + s.componentStateService.OnInitialized(s.session.ServerID) + return nil +} + +// Start starts the streamingnode server. +func (s *Server) Start() { + // Just do nothing now. +} + +// Stop stops the streamingnode server. +func (s *Server) Stop() { + log.Info("stopping streamingnode server...") + s.componentStateService.OnStopping() + log.Info("close wal manager...") + s.walManager.Close() + log.Info("streamingnode server stopped") +} + +// Health returns the health status of the streamingnode server. +func (s *Server) Health(ctx context.Context) commonpb.StateCode { + resp, _ := s.componentStateService.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + return resp.State.StateCode +} + +// initBasicComponent initialize all underlying dependency for streamingnode. +func (s *Server) initBasicComponent(_ context.Context) { + var err error + s.walManager, err = walmanager.OpenManager() + if err != nil { + panic("open wal manager failed") + } +} + +// initService initializes the grpc service. +func (s *Server) initService(_ context.Context) { + s.handlerService = service.NewHandlerService(s.walManager) + s.managerService = service.NewManagerService(s.walManager) + s.registerGRPCService(s.grpcServer) +} + +// registerGRPCService register all grpc service to grpc server. +func (s *Server) registerGRPCService(grpcServer *grpc.Server) { + streamingpb.RegisterStreamingNodeHandlerServiceServer(grpcServer, s.handlerService) + streamingpb.RegisterStreamingNodeManagerServiceServer(grpcServer, s.managerService) + streamingpb.RegisterStreamingNodeStateServiceServer(grpcServer, s.componentStateService) +} diff --git a/internal/types/types.go b/internal/types/types.go index 93c85dc9e7..b481e93061 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -22,6 +22,7 @@ import ( "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -118,6 +119,8 @@ type DataCoord interface { type DataCoordComponent interface { DataCoord + RegisterStreamingCoordGRPCService(s *grpc.Server) + SetAddress(address string) // SetEtcdClient set EtcdClient for DataCoord // `etcdClient` is a client of etcd diff --git a/internal/util/componentutil/component_service.go b/internal/util/componentutil/component_service.go new file mode 100644 index 0000000000..99303aff27 --- /dev/null +++ b/internal/util/componentutil/component_service.go @@ -0,0 +1,89 @@ +package componentutil + +import ( + "context" + "sync" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// NewComponentStateService create a ComponentStateService +func NewComponentStateService(role string) *ComponentStateService { + return &ComponentStateService{ + nodeID: common.NotRegisteredID, + role: role, + stateCode: commonpb.StateCode_StandBy, + } +} + +// ComponentStateService is a helper type to implement a GetComponentStates rpc at server side. +// StandBy -> Initializing -> Healthy -> Abnormal -> Healthy +// All can transfer into Stopping +type ComponentStateService struct { + mu sync.Mutex + nodeID int64 + role string + stateCode commonpb.StateCode +} + +// OnInitializing set the state to initializing +func (s *ComponentStateService) OnInitializing() { + s.mu.Lock() + defer s.mu.Unlock() + if s.stateCode != commonpb.StateCode_StandBy { + panic("standby -> initializing") + } + s.stateCode = commonpb.StateCode_Initializing +} + +func (s *ComponentStateService) OnInitialized(nodeID int64) { + s.mu.Lock() + defer s.mu.Unlock() + s.nodeID = nodeID + if s.stateCode != commonpb.StateCode_Initializing { + panic("initializing -> healthy") + } + s.stateCode = commonpb.StateCode_Healthy +} + +func (s *ComponentStateService) OnHealthy() { + s.mu.Lock() + if s.stateCode == commonpb.StateCode_Abnormal { + s.stateCode = commonpb.StateCode_Healthy + } + s.mu.Unlock() +} + +func (s *ComponentStateService) OnAbnormal() { + s.mu.Lock() + if s.stateCode == commonpb.StateCode_Healthy { + s.stateCode = commonpb.StateCode_Abnormal + } + s.mu.Unlock() +} + +func (s *ComponentStateService) OnStopping() { + s.mu.Lock() + s.stateCode = commonpb.StateCode_Stopping + s.mu.Unlock() +} + +// GetComponentStates get the component state of a milvus node. +func (s *ComponentStateService) GetComponentStates(ctx context.Context, _ *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { + s.mu.Lock() + code := s.stateCode + nodeID := s.nodeID + s.mu.Unlock() + + return &milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + NodeID: nodeID, + Role: s.role, + StateCode: code, + }, + Status: merr.Status(nil), + }, nil +} diff --git a/internal/util/componentutil/component_service_test.go b/internal/util/componentutil/component_service_test.go new file mode 100644 index 0000000000..0a8267c8ec --- /dev/null +++ b/internal/util/componentutil/component_service_test.go @@ -0,0 +1,46 @@ +package componentutil + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" +) + +func TestComponentStateService(t *testing.T) { + ctx := context.Background() + s := NewComponentStateService("role") + resp, err := s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_StandBy, resp.State.StateCode) + + s.OnInitializing() + resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Initializing, resp.State.StateCode) + + s.OnInitialized(1) + resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode) + assert.Equal(t, "role", resp.State.Role) + assert.Equal(t, int64(1), resp.State.NodeID) + + s.OnAbnormal() + resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Abnormal, resp.State.StateCode) + + s.OnHealthy() + resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode) + + s.OnStopping() + resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Stopping, resp.State.StateCode) +} diff --git a/internal/util/streamingutil/checker.go b/internal/util/streamingutil/checker.go new file mode 100644 index 0000000000..bc9169f489 --- /dev/null +++ b/internal/util/streamingutil/checker.go @@ -0,0 +1,16 @@ +package streamingutil + +import "os" + +// IsStreamingServiceEnabled returns whether the streaming service is enabled. +func IsStreamingServiceEnabled() bool { + // TODO: check if the environment variable MILVUS_STREAMING_SERVICE_ENABLED is set + return os.Getenv("MILVUS_STREAMING_SERVICE_ENABLED") == "1" +} + +// MustEnableStreamingService panics if the streaming service is not enabled. +func MustEnableStreamingService() { + if !IsStreamingServiceEnabled() { + panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1") + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 7dab60e8a6..6f420c4fbc 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -82,13 +82,14 @@ type ComponentParam struct { StreamingCoordCfg streamingCoordConfig StreamingNodeCfg streamingNodeConfig - RootCoordGrpcServerCfg GrpcServerConfig - ProxyGrpcServerCfg GrpcServerConfig - QueryCoordGrpcServerCfg GrpcServerConfig - QueryNodeGrpcServerCfg GrpcServerConfig - DataCoordGrpcServerCfg GrpcServerConfig - DataNodeGrpcServerCfg GrpcServerConfig - IndexNodeGrpcServerCfg GrpcServerConfig + RootCoordGrpcServerCfg GrpcServerConfig + ProxyGrpcServerCfg GrpcServerConfig + QueryCoordGrpcServerCfg GrpcServerConfig + QueryNodeGrpcServerCfg GrpcServerConfig + DataCoordGrpcServerCfg GrpcServerConfig + DataNodeGrpcServerCfg GrpcServerConfig + IndexNodeGrpcServerCfg GrpcServerConfig + StreamingNodeGrpcServerCfg GrpcServerConfig RootCoordGrpcClientCfg GrpcClientConfig ProxyGrpcClientCfg GrpcClientConfig @@ -99,8 +100,7 @@ type ComponentParam struct { IndexNodeGrpcClientCfg GrpcClientConfig StreamingCoordGrpcClientCfg GrpcClientConfig StreamingNodeGrpcClientCfg GrpcClientConfig - - IntegrationTestCfg integrationTestConfig + IntegrationTestCfg integrationTestConfig RuntimeConfig runtimeConfig } @@ -130,6 +130,8 @@ func (p *ComponentParam) init(bt *BaseTable) { p.DataCoordCfg.init(bt) p.DataNodeCfg.init(bt) p.IndexNodeCfg.init(bt) + p.StreamingCoordCfg.init(bt) + p.StreamingNodeCfg.init(bt) p.HTTPCfg.init(bt) p.LogCfg.init(bt) p.RoleCfg.init(bt) @@ -145,6 +147,7 @@ func (p *ComponentParam) init(bt *BaseTable) { p.DataCoordGrpcServerCfg.Init("dataCoord", bt) p.DataNodeGrpcServerCfg.Init("dataNode", bt) p.IndexNodeGrpcServerCfg.Init("indexNode", bt) + p.StreamingNodeGrpcServerCfg.Init("streamingNode", bt) p.RootCoordGrpcClientCfg.Init("rootCoord", bt) p.ProxyGrpcClientCfg.Init("proxy", bt)